From 07d7859a690d42a2997df722237d0bbc9f8394ee Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 14 Jun 2024 15:44:33 +0200 Subject: [PATCH] Enhancement for integration with waku-simulator --- apps/liteprotocoltester/.env | 12 + .../diagnose_connections.nim | 96 ++++++++ .../docker-compose-on-simularor.yml | 212 ++++++++++++++++++ apps/liteprotocoltester/filter_subscriber.nim | 9 +- .../lightpush_publisher.nim | 66 +++++- .../liteprotocoltester/liteprotocoltester.nim | 14 +- apps/liteprotocoltester/run_service_node.sh | 25 ++- apps/liteprotocoltester/run_tester_node.sh | 54 +++-- apps/liteprotocoltester/tester_config.nim | 25 ++- apps/liteprotocoltester/tester_message.nim | 13 ++ 10 files changed, 482 insertions(+), 44 deletions(-) create mode 100644 apps/liteprotocoltester/.env create mode 100644 apps/liteprotocoltester/diagnose_connections.nim create mode 100644 apps/liteprotocoltester/docker-compose-on-simularor.yml diff --git a/apps/liteprotocoltester/.env b/apps/liteprotocoltester/.env new file mode 100644 index 0000000000..e4359cade2 --- /dev/null +++ b/apps/liteprotocoltester/.env @@ -0,0 +1,12 @@ +NWAKU_IMAGE=quay.io/wakuorg/nwaku-pr:2800-rln-v2 + +START_PUBLISHING_AFTER=30 # seconds +NUM_MESSAGES=250 +DELAY_MESSAGES=200 # gap between messages + +MIN_MESSAGE_SIZE=1Kb +MAX_MESSAGE_SIZE=120Kb + +# PUBSUB=/waku/2/rs/66/0 +# CONTENT_TOPIC=/tester/1/light-pubsub-example/proto + diff --git a/apps/liteprotocoltester/diagnose_connections.nim b/apps/liteprotocoltester/diagnose_connections.nim new file mode 100644 index 0000000000..4973365fb4 --- /dev/null +++ b/apps/liteprotocoltester/diagnose_connections.nim @@ -0,0 +1,96 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[options, strutils, os, sequtils, net, strformat], + chronicles, + chronos, + metrics, + libbacktrace, + system/ansi_c, + libp2p/crypto/crypto, + confutils, + libp2p/wire + +import + ../../waku/common/logging, + ../../waku/factory/waku, + ../../waku/factory/external_config, + ../../waku/node/health_monitor, + ../../waku/node/waku_metrics, + ../../waku/waku_api/rest/builder as rest_server_builder, + ../../waku/node/peer_manager, + ../../waku/waku_lightpush/common, + ../../waku/waku_relay, + ../../waku/waku_filter_v2, + ../../waku/waku_api/rest/client, + ../../waku/waku_api/rest/admin/client, + ./tester_config, + ./lightpush_publisher, + ./filter_subscriber + +logScope: + topics = "diagnose connections" + +proc logSelfPeersLoop(pm: PeerManager, interval: Duration) {.async.} = + trace "Starting logSelfPeersLoop diagnosys loop" + while true: + let selfLighpushPeers = pm.peerStore.getPeersByProtocol(WakuLightPushCodec) + let selfRelayPeers = pm.peerStore.getPeersByProtocol(WakuRelayCodec) + let selfFilterPeers = pm.peerStore.getPeersByProtocol(WakuFilterSubscribeCodec) + + let printable = catch: + """*------------------------------------------------------------------------------------------* +| Self ({pm.switch.peerInfo}) peers: +*------------------------------------------------------------------------------------------* +| Lightpush peers({selfLighpushPeers.len()}): ${selfLighpushPeers} +*------------------------------------------------------------------------------------------* +| Filter peers({selfFilterPeers.len()}): ${selfFilterPeers} +*------------------------------------------------------------------------------------------* +| Relay peers({selfRelayPeers.len()}): ${selfRelayPeers} +*------------------------------------------------------------------------------------------*""".fmt() + + if printable.isErr(): + echo "Error while printing statistics: " & printable.error().msg + else: + echo printable.get() + + await sleepAsync(interval) + +proc logServiceRelayPeers( + pm: PeerManager, codec: string, interval: Duration +) {.async.} = + trace "Starting service node connectivity diagnosys loop" + while true: + echo "*------------------------------------------------------------------------------------------*" + echo "| Service peer connectivity:" + let selfLighpushPeers = pm.selectPeer(codec) + if selfLighpushPeers.isSome(): + let ma = selfLighpushPeers.get().addrs[0] + var serviceIp = initTAddress(ma).valueOr: + echo "Error while parsing multiaddress: " & $error + continue + + serviceIp.port = Port(8645) + let restClient = newRestHttpClient(initTAddress($serviceIp)) + + let getPeersRes = await restClient.getPeers() + + if getPeersRes.status == 200: + let nrOfPeers = getPeersRes.data.len() + echo "Service node (@" & $ma & ") peers: " & $getPeersRes.data + else: + echo "Error while fetching service node (@" & $ma & ") peers: " & + $getPeersRes.data + else: + echo "No service node peers found" + + echo "*------------------------------------------------------------------------------------------*" + + await sleepAsync(interval) + +proc startPeriodicPeerDiagnostic*(pm: PeerManager, codec: string) {.async.} = + asyncSpawn logSelfPeersLoop(pm, chronos.seconds(20)) + asyncSpawn logServiceRelayPeers(pm, codec, chronos.seconds(20)) diff --git a/apps/liteprotocoltester/docker-compose-on-simularor.yml b/apps/liteprotocoltester/docker-compose-on-simularor.yml new file mode 100644 index 0000000000..74fc246935 --- /dev/null +++ b/apps/liteprotocoltester/docker-compose-on-simularor.yml @@ -0,0 +1,212 @@ +version: "3.7" +x-logging: &logging + logging: + driver: json-file + options: + max-size: 1000m + +# Environment variable definitions +x-eth-client-address: ð_client_address ${ETH_CLIENT_ADDRESS:-} # Add your ETH_CLIENT_ADDRESS after the "-" + +x-rln-environment: &rln_env + RLN_RELAY_CONTRACT_ADDRESS: ${RLN_RELAY_CONTRACT_ADDRESS:-0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4} + RLN_RELAY_CRED_PATH: ${RLN_RELAY_CRED_PATH:-} # Optional: Add your RLN_RELAY_CRED_PATH after the "-" + RLN_RELAY_CRED_PASSWORD: ${RLN_RELAY_CRED_PASSWORD:-} # Optional: Add your RLN_RELAY_CRED_PASSWORD after the "-" + +x-test-running-conditions: &test_running_conditions + NUM_MESSAGES: ${NUM_MESSAGES:-120} + DELAY_MESSAGES: "${DELAY_MESSAGES:-1000}" + PUBSUB: ${PUBSUB:-} + CONTENT_TOPIC: ${CONTENT_TOPIC:-} + MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb} + MAX_MESSAGE_SIZE: ${MAX_MESSAGE_SIZE:-150Kb} + START_PUBLISHING_AFTER: ${START_PUBLISHING_AFTER:-5} # seconds + + +# Services definitions +services: + lightpush-service: + image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest} + # ports: + # - 30304:30304/tcp + # - 30304:30304/udp + # - 9005:9005/udp + # - 127.0.0.1:8003:8003 + # - 80:80 #Let's Encrypt + # - 8000:8000/tcp #WSS + # - 127.0.0.1:8645:8645 + <<: + - *logging + environment: + DOMAIN: ${DOMAIN} + RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}" + ETH_CLIENT_ADDRESS: *eth_client_address + EXTRA_ARGS: ${EXTRA_ARGS} + <<: + - *rln_env + volumes: + - ./run_service_node.sh:/opt/run_service_node.sh:Z + - ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z + - ./rln_tree:/etc/rln_tree/:Z + - ./keystore:/keystore:Z + entrypoint: sh + command: + - /opt/run_service_node.sh + - LIGHTPUSH + networks: + - waku-simulator_simulation + + publishernode: + image: waku.liteprotocoltester:latest + build: + context: ../.. + dockerfile: ./apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy + deploy: + replicas: ${NUM_PUBLISHER_NODES:-3} + # ports: + # - 30304:30304/tcp + # - 30304:30304/udp + # - 9005:9005/udp + # - 127.0.0.1:8003:8003 + # - 80:80 #Let's Encrypt + # - 8000:8000/tcp #WSS + # - 127.0.0.1:8646:8646 + <<: + - *logging + environment: + DOMAIN: ${DOMAIN} + RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}" + ETH_CLIENT_ADDRESS: *eth_client_address + EXTRA_ARGS: ${EXTRA_ARGS} + <<: + - *rln_env + - *test_running_conditions + volumes: + - ./run_tester_node.sh:/opt/run_tester_node.sh:Z + - ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z + - ./rln_tree:/etc/rln_tree/:Z + - ./keystore:/keystore:Z + entrypoint: sh + command: + - /opt/run_tester_node.sh + - SENDER + depends_on: + - lightpush-service + configs: + - source: cfg_tester_node.toml + target: config.toml + networks: + - waku-simulator_simulation + + filter-service: + image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest} + # ports: + # - 30304:30305/tcp + # - 30304:30305/udp + # - 9005:9005/udp + # - 127.0.0.1:8003:8003 + # - 80:80 #Let's Encrypt + # - 8000:8000/tcp #WSS + # - 127.0.0.1:8645:8645 + <<: + - *logging + environment: + DOMAIN: ${DOMAIN} + RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}" + ETH_CLIENT_ADDRESS: *eth_client_address + EXTRA_ARGS: ${EXTRA_ARGS} + <<: + - *rln_env + volumes: + - ./run_service_node.sh:/opt/run_service_node.sh:Z + - ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z + - ./rln_tree:/etc/rln_tree/:Z + - ./keystore:/keystore:Z + entrypoint: sh + command: + - /opt/run_service_node.sh + - FILTER + networks: + - waku-simulator_simulation + + + receivernode: + image: waku.liteprotocoltester:latest + build: + context: ../.. + dockerfile: ./apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy + deploy: + replicas: ${NUM_RECEIVER_NODES:-1} + # ports: + # - 30304:30304/tcp + # - 30304:30304/udp + # - 9005:9005/udp + # - 127.0.0.1:8003:8003 + # - 80:80 #Let's Encrypt + # - 8000:8000/tcp #WSS + # - 127.0.0.1:8647:8647 + <<: + - *logging + environment: + DOMAIN: ${DOMAIN} + RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}" + ETH_CLIENT_ADDRESS: *eth_client_address + EXTRA_ARGS: ${EXTRA_ARGS} + <<: + - *rln_env + - *test_running_conditions + volumes: + - ./run_tester_node.sh:/opt/run_tester_node.sh:Z + - ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z + - ./rln_tree:/etc/rln_tree/:Z + - ./keystore:/keystore:Z + entrypoint: sh + command: + - /opt/run_tester_node.sh + - RECEIVER + depends_on: + - filter-service + - publishernode + configs: + - source: cfg_tester_node.toml + target: config.toml + networks: + - waku-simulator_simulation + + ## We have prometheus and grafana defined in waku-simulator already + # prometheus: + # image: docker.io/prom/prometheus:latest + # volumes: + # - ./monitoring/prometheus-config.yml:/etc/prometheus/prometheus.yml:Z + # command: + # - --config.file=/etc/prometheus/prometheus.yml + # ports: + # - 127.0.0.1:9090:9090 + # depends_on: + # - servicenode + + # grafana: + # image: docker.io/grafana/grafana:latest + # env_file: + # - ./monitoring/configuration/grafana-plugins.env + # volumes: + # - ./monitoring/configuration/grafana.ini:/etc/grafana/grafana.ini:Z + # - ./monitoring/configuration/dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml:Z + # - ./monitoring/configuration/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:Z + # - ./monitoring/configuration/dashboards:/var/lib/grafana/dashboards/:Z + # - ./monitoring/configuration/customizations/custom-logo.svg:/usr/share/grafana/public/img/grafana_icon.svg:Z + # - ./monitoring/configuration/customizations/custom-logo.svg:/usr/share/grafana/public/img/grafana_typelogo.svg:Z + # - ./monitoring/configuration/customizations/custom-logo.png:/usr/share/grafana/public/img/fav32.png:Z + # ports: + # - 0.0.0.0:3000:3000 + # depends_on: + # - prometheus + +configs: + cfg_tester_node.toml: + content: | + max-connections = 100 + +networks: + waku-simulator_simulation: + external: true diff --git a/apps/liteprotocoltester/filter_subscriber.nim b/apps/liteprotocoltester/filter_subscriber.nim index ee77708451..11e041da9b 100644 --- a/apps/liteprotocoltester/filter_subscriber.nim +++ b/apps/liteprotocoltester/filter_subscriber.nim @@ -85,12 +85,17 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = stats.addMessage(testerMessage.sender, testerMessage) - trace "message received", + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex + + notice "message received", index = testerMessage.index, count = testerMessage.count, startedAt = $testerMessage.startedAt, sinceStart = $testerMessage.sinceStart, - sincePrev = $testerMessage.sincePrev + sincePrev = $testerMessage.sincePrev, + size = $testerMessage.size, + pubsubTopic = pubsubTopic, + hash = msgHash wakuNode.wakuFilterClient.registerPushHandler(pushHandler) diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/lightpush_publisher.nim index 0dd0b56fe2..b6459ec783 100644 --- a/apps/liteprotocoltester/lightpush_publisher.nim +++ b/apps/liteprotocoltester/lightpush_publisher.nim @@ -1,5 +1,5 @@ import - std/strformat, + std/[strformat, sysrand, random, sequtils], system/ansi_c, chronicles, chronos, @@ -12,16 +12,27 @@ import ../../../waku/node/peer_manager, ../../../waku/waku_core, ../../../waku/waku_lightpush/client, + ../../../waku/common/utils/parse_size_units, ./tester_config, ./tester_message +randomize() + +type SizeRange* = tuple[min: uint64, max: uint64] + +var RANDOM_PALYLOAD {.threadvar.}: seq[byte] +RANDOM_PALYLOAD = urandom(1024 * 1024) + # 1MiB of random payload to be used to extend message + proc prepareMessage( sender: string, messageIndex, numMessages: uint32, startedAt: TimeStamp, prevMessageAt: var Timestamp, contentTopic: ContentTopic, -): WakuMessage = + size: SizeRange, +): (WakuMessage, uint64) = + var renderSize = rand(size.min .. size.max) let current = getNowInNanosecondTime() let payload = ProtocolTesterMessage( sender: sender, @@ -30,49 +41,72 @@ proc prepareMessage( startedAt: startedAt, sinceStart: current - startedAt, sincePrev: current - prevMessageAt, + size: renderSize, ) prevMessageAt = current let text = js.Json.encode(payload) + let contentPayload = toBytes(text & " \0") + + if renderSize < len(contentPayload).uint64: + renderSize = len(contentPayload).uint64 + + let finalPayload = concat( + contentPayload, RANDOM_PALYLOAD[0 .. renderSize - len(contentPayload).uint64] + ) let message = WakuMessage( - payload: toBytes(text), # content of the message + payload: finalPayload, # content of the message contentTopic: contentTopic, # content topic to publish to ephemeral: true, # tell store nodes to not store it timestamp: current, # current timestamp ) - return message + return (message, renderSize) proc publishMessages( wakuNode: WakuNode, lightpushPubsubTopic: PubsubTopic, lightpushContentTopic: ContentTopic, numMessages: uint32, + messageSizeRange: SizeRange, delayMessages: Duration, ) {.async.} = let startedAt = getNowInNanosecondTime() var prevMessageAt = startedAt var failedToSendCount: uint32 = 0 + var renderMsgSize = messageSizeRange + # sets some default of min max message size to avoid conflict with meaningful payload size + renderMsgSize.min = max(1024.uint64, renderMsgSize.min) # do not use less than 1KB + renderMsgSize.max = max(2048.uint64, renderMsgSize.max) # minimum of max is 2KB + renderMsgSize.min = min(renderMsgSize.min, renderMsgSize.max) + renderMsgSize.max = max(renderMsgSize.min, renderMsgSize.max) let selfPeerId = $wakuNode.switch.peerInfo.peerId var messagesSent: uint32 = 1 while numMessages >= messagesSent: - let message = prepareMessage( + let (message, msgSize) = prepareMessage( selfPeerId, messagesSent, numMessages, startedAt, prevMessageAt, - lightpushContentTopic, + lightpushContentTopic, renderMsgSize, ) let wlpRes = await wakuNode.lightpushPublish(some(lightpushPubsubTopic), message) + let msgHash = computeMessageHash(lightpushPubsubTopic, message).to0xHex + if wlpRes.isOk(): - info "published message using lightpush", - index = messagesSent, count = numMessages + notice "published message using lightpush", + index = messagesSent, + count = numMessages, + size = msgSize, + pubsubTopic = lightpushPubsubTopic, + hash = msgHash else: - error "failed to publish message using lightpush", err = wlpRes.error + error "failed to publish message using lightpush", + err = wlpRes.error, hash = msgHash inc(failedToSendCount) - await sleepAsync(delayMessages) # Publish every 5 seconds + await sleepAsync(delayMessages) inc(messagesSent) let report = catch: @@ -94,8 +128,15 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = return # give some time to receiver side to set up - # TODO: this maybe done in more sphisticated way, though. - let waitTillStartTesting = 5.seconds + let waitTillStartTesting = conf.startPublishingAfter.seconds + + let parsedMinMsgSize = parseMsgSize(conf.minTestMessageSize).valueOr: + error "failed to parse 'min-test-msg-size' param: ", error = error + return + + let parsedMaxMsgSize = parseMsgSize(conf.maxTestMessageSize).valueOr: + error "failed to parse 'max-test-msg-size' param: ", error = error + return info "Sending test messages in", wait = waitTillStartTesting waitFor sleepAsync(waitTillStartTesting) @@ -108,5 +149,6 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = conf.pubsubTopics[0], conf.contentTopics[0], conf.numMessages, + (min: parsedMinMsgSize, max: parsedMaxMsgSize), conf.delayMessages.milliseconds, ) diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim index 3f09eb31db..bff404d56e 100644 --- a/apps/liteprotocoltester/liteprotocoltester.nim +++ b/apps/liteprotocoltester/liteprotocoltester.nim @@ -20,9 +20,12 @@ import ../../waku/node/health_monitor, ../../waku/node/waku_metrics, ../../waku/waku_api/rest/builder as rest_server_builder, + ../../waku/waku_lightpush/common, + ../../waku/waku_filter_v2, ./tester_config, ./lightpush_publisher, - ./filter_subscriber + ./filter_subscriber, + ./diagnose_connections logScope: topics = "liteprotocoltester main" @@ -84,7 +87,7 @@ when isMainModule: wakuConf.logFormat = conf.logFormat wakuConf.staticNodes = @[conf.serviceNode] wakuConf.nat = conf.nat - wakuConf.maxConnections = 100 + wakuConf.maxConnections = 500 wakuConf.restAddress = conf.restAddress wakuConf.restPort = conf.restPort wakuConf.restAllowOrigin = conf.restAllowOrigin @@ -106,6 +109,9 @@ when isMainModule: wakuConf.rest = true + wakuConf.metricsServer = true + wakuConf.metricsServerAddress = parseIpAddress("0.0.0.0") + # NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it # It will always be called from main thread anyway. # Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety @@ -187,8 +193,12 @@ when isMainModule: info "Node setup complete" if conf.testFunc == TesterFunctionality.SENDER: + waitFor startPeriodicPeerDiagnostic(wakuApp.node.peerManager, WakuLightPushCodec) setupAndPublish(wakuApp.node, conf) else: + waitFor startPeriodicPeerDiagnostic( + wakuApp.node.peerManager, WakuFilterSubscribeCodec + ) setupAndSubscribe(wakuApp.node, conf) runForever() diff --git a/apps/liteprotocoltester/run_service_node.sh b/apps/liteprotocoltester/run_service_node.sh index 5cce31a95e..7a029434d1 100644 --- a/apps/liteprotocoltester/run_service_node.sh +++ b/apps/liteprotocoltester/run_service_node.sh @@ -5,6 +5,23 @@ IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/') echo "Service node IP: ${IP}" +RETRIES=${RETRIES:=10} + +while [ -z "${BOOTSTRAP_ENR}" ] && [ ${RETRIES} -ge 0 ]; do + BOOTSTRAP_ENR=$(wget -qO- http://bootstrap:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"enrUri":"\([^"]*\)".*/\1/'); + echo "Bootstrap node not ready, retrying (retries left: ${RETRIES})" + sleep 1 + RETRIES=$(( $RETRIES - 1 )) +done + +if [ -z "${BOOTSTRAP_ENR}" ]; then + echo "Could not get BOOTSTRAP_ENR and none provided. Failing" + exit 1 +fi + +echo "Using bootstrap node: ${BOOTSTRAP_ENR}" + + exec /usr/bin/wakunode\ --relay=true\ --filter=true\ @@ -20,10 +37,10 @@ exec /usr/bin/wakunode\ --dns-discovery=true\ --discv5-discovery=true\ --discv5-enr-auto-update=True\ - --log-level=DEBUG\ + --discv5-bootstrap-node=${BOOTSTRAP_ENR}\ + --log-level=INFO\ --metrics-server=True\ --metrics-server-address=0.0.0.0\ - --nodekey=e3f5e64568b3a612dee609f6e7c0203c501dab6131662922bdcbcabd474281d5\ --nat=extip:${IP}\ - --pubsub-topic=/waku/2/default-waku/proto\ - --cluster-id=0 + --pubsub-topic=/waku/2/rs/66/0\ + --cluster-id=66 diff --git a/apps/liteprotocoltester/run_tester_node.sh b/apps/liteprotocoltester/run_tester_node.sh index a96c0561e5..9e51261e29 100644 --- a/apps/liteprotocoltester/run_tester_node.sh +++ b/apps/liteprotocoltester/run_tester_node.sh @@ -1,5 +1,7 @@ #!/bin/sh +set -x + if test -f .env; then echo "Using .env file" . $(pwd)/.env @@ -16,10 +18,22 @@ NODE_INDEX=$((FOURTH_OCTET + 256 * THIRD_OCTET)) echo "NODE_INDEX $NODE_INDEX" +FUNCTION=$1 +if [ "${FUNCTION}" = "SENDER" ]; then + FUNCTION=--test-func=SENDER + SERVICENAME=lightpush-service +fi + +if [ "${FUNCTION}" = "RECEIVER" ]; then + FUNCTION=--test-func=RECEIVER + SERVICENAME=filter-service +fi + + RETRIES=${RETRIES:=10} while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do - SERIVCE_NODE_ADDR=$(wget -qO- http://servicenode:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/'); + SERIVCE_NODE_ADDR=$(wget -qO- http://${SERVICENAME}:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/'); echo "Service node not ready, retrying (retries left: ${RETRIES})" sleep 1 RETRIES=$(( $RETRIES - 1 )) @@ -30,47 +44,43 @@ if [ -z "${SERIVCE_NODE_ADDR}" ]; then exit 1 fi - if [ -n "${PUBSUB}" ]; then PUBSUB=--pubsub-topic="${PUBSUB}" +else + PUBSUB=--pubsub-topic="/waku/2/rs/66/0" fi if [ -n "${CONTENT_TOPIC}" ]; then CONTENT_TOPIC=--content-topic="${CONTENT_TOPIC}" fi -FUNCTION=$1 - -echo "Tester node: ${FUNCTION}" - -REST_PORT=--rest-port=8647 - -if [ "${FUNCTION}" = "SENDER" ]; then - FUNCTION=--test-func=SENDER - REST_PORT=--rest-port=8646 +if [ -n "${START_PUBLISHING_AFTER}" ]; then + START_PUBLISHING_AFTER=--start-publishing-after="${START_PUBLISHING_AFTER}" fi -if [ "${FUNCTION}" = "RECEIVER" ]; then - FUNCTION=--test-func=RECEIVER - REST_PORT=--rest-port=8647 +if [ -n "${MIN_MESSAGE_SIZE}" ]; then + MIN_MESSAGE_SIZE=--min-test-msg-size="${MIN_MESSAGE_SIZE}" fi -if [ -z "${FUNCTION}" ]; then - FUNCTION=--test-func=RECEIVER +if [ -n "${MAX_MESSAGE_SIZE}" ]; then + MAX_MESSAGE_SIZE=--max-test-msg-size="${MAX_MESSAGE_SIZE}" fi + +echo "Tester node: ${FUNCTION}" echo "Using service node: ${SERIVCE_NODE_ADDR}" + exec /usr/bin/liteprotocoltester\ - --log-level=DEBUG\ + --log-level=INFO\ --service-node="${SERIVCE_NODE_ADDR}"\ - --pubsub-topic=/waku/2/default-waku/proto\ - --cluster-id=0\ + --cluster-id=66\ --num-messages=${NUM_MESSAGES}\ --delay-messages=${DELAY_MESSAGES}\ --nat=extip:${IP}\ - ${FUNCTION}\ ${PUBSUB}\ ${CONTENT_TOPIC}\ - ${REST_PORT} - + ${FUNCTION}\ + ${START_PUBLISHING_AFTER}\ + ${MIN_MESSAGE_SIZE}\ + ${MAX_MESSAGE_SIZE} # --config-file=config.toml\ diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim index 8f9a8c0503..a8c387d3df 100644 --- a/apps/liteprotocoltester/tester_config.nim +++ b/apps/liteprotocoltester/tester_config.nim @@ -24,8 +24,10 @@ import export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet const - LitePubsubTopic* = PubsubTopic("/waku/2/default-waku/proto") + LitePubsubTopic* = PubsubTopic("/waku/2/rs/66/0") LiteContentTopic* = ContentTopic("/tester/1/light-pubsub-example/proto") + DefaultMinTestMessageSizeStr* = "1KiB" + DefaultMaxTestMessageSizeStr* = "150KiB" type TesterFunctionality* = enum SENDER # pumps messages to the network @@ -74,6 +76,12 @@ type LiteProtocolTesterConf* = object desc: "Number of messages to send.", defaultValue: 120, name: "num-messages" .}: uint32 + startPublishingAfter* {. + desc: "Wait number of seconds before start publishing messages.", + defaultValue: 5, + name: "start-publishing-after" + .}: uint32 + delayMessages* {. desc: "Delay between messages in milliseconds.", defaultValue: 1000, @@ -103,8 +111,21 @@ type LiteProtocolTesterConf* = object "Cluster id that the node is running in. Node in a different cluster id is disconnected.", defaultValue: 0, name: "cluster-id" - .}: uint32 + .}: uint16 + minTestMessageSize* {. + desc: + "Minimum message size. Accepted units: KiB, KB, and B. e.g. 1024KiB; 1500 B; etc.", + defaultValue: DefaultMinTestMessageSizeStr, + name: "min-test-msg-size" + .}: string + + maxTestMessageSize* {. + desc: + "Maximum message size. Accepted units: KiB, KB, and B. e.g. 1024KiB; 1500 B; etc.", + defaultValue: DefaultMaxTestMessageSizeStr, + name: "max-test-msg-size" + .}: string ## Tester REST service configuration restAddress* {. desc: "Listening address of the REST HTTP server.", diff --git a/apps/liteprotocoltester/tester_message.nim b/apps/liteprotocoltester/tester_message.nim index 635a31ab40..c1ecc880b5 100644 --- a/apps/liteprotocoltester/tester_message.nim +++ b/apps/liteprotocoltester/tester_message.nim @@ -18,6 +18,7 @@ type ProtocolTesterMessage* = object startedAt*: int64 sinceStart*: int64 sincePrev*: int64 + size*: uint64 proc writeValue*( writer: var JsonWriter[RestJson], value: ProtocolTesterMessage @@ -29,6 +30,7 @@ proc writeValue*( writer.writeField("startedAt", value.startedAt) writer.writeField("sinceStart", value.sinceStart) writer.writeField("sincePrev", value.sincePrev) + writer.writeField("size", value.size) writer.endRecord() proc readValue*( @@ -41,6 +43,7 @@ proc readValue*( startedAt: Option[int64] sinceStart: Option[int64] sincePrev: Option[int64] + size: Option[uint64] for fieldName in readObjectFields(reader): case fieldName @@ -80,6 +83,12 @@ proc readValue*( "Multiple `sincePrev` fields found", "ProtocolTesterMessage" ) sincePrev = some(reader.readValue(int64)) + of "size": + if size.isSome(): + reader.raiseUnexpectedField( + "Multiple `size` fields found", "ProtocolTesterMessage" + ) + size = some(reader.readValue(uint64)) else: unrecognizedFieldWarning() @@ -101,6 +110,9 @@ proc readValue*( if sincePrev.isNone(): reader.raiseUnexpectedValue("Field `sincePrev` is missing") + if size.isNone(): + reader.raiseUnexpectedValue("Field `size` is missing") + value = ProtocolTesterMessage( sender: sender.get(), index: index.get(), @@ -108,4 +120,5 @@ proc readValue*( startedAt: startedAt.get(), sinceStart: sinceStart.get(), sincePrev: sincePrev.get(), + size: size.get(), )