From 9657455442cb2100475de36514071ed3043ed75e Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 18 Jun 2024 09:15:05 +0200 Subject: [PATCH] Enhancement on statistics reports, added list of sent messages with hash, fixed latency calculations --- apps/liteprotocoltester/.env | 8 +- .../Dockerfile.liteprotocoltester.compile | 2 +- apps/liteprotocoltester/filter_subscriber.nim | 2 +- .../lightpush_publisher.nim | 11 ++ apps/liteprotocoltester/statistics.nim | 146 +++++++++++------- 5 files changed, 109 insertions(+), 60 deletions(-) diff --git a/apps/liteprotocoltester/.env b/apps/liteprotocoltester/.env index e4359cade2..c1ab26c33c 100644 --- a/apps/liteprotocoltester/.env +++ b/apps/liteprotocoltester/.env @@ -1,11 +1,11 @@ NWAKU_IMAGE=quay.io/wakuorg/nwaku-pr:2800-rln-v2 START_PUBLISHING_AFTER=30 # seconds -NUM_MESSAGES=250 -DELAY_MESSAGES=200 # gap between messages +NUM_MESSAGES=10000 +DELAY_MESSAGES=168 # gap between messages -MIN_MESSAGE_SIZE=1Kb -MAX_MESSAGE_SIZE=120Kb +MIN_MESSAGE_SIZE=15Kb +MAX_MESSAGE_SIZE=145Kb # PUBSUB=/waku/2/rs/66/0 # CONTENT_TOPIC=/tester/1/light-pubsub-example/proto diff --git a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile index bef14abb54..5ad5f83852 100644 --- a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile +++ b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile @@ -4,7 +4,7 @@ ARG NIMFLAGS ARG MAKE_TARGET=liteprotocoltester ARG NIM_COMMIT - ARG LOG_LEVEL=TRACE + ARG LOG_LEVEL=DEBUG # Get build tools and required header files RUN apk add --no-cache bash git build-base pcre-dev linux-headers curl jq diff --git a/apps/liteprotocoltester/filter_subscriber.nim b/apps/liteprotocoltester/filter_subscriber.nim index 8da493c2d6..1ee2bdcca3 100644 --- a/apps/liteprotocoltester/filter_subscriber.nim +++ b/apps/liteprotocoltester/filter_subscriber.nim @@ -102,7 +102,7 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = proc(udata: pointer) {.gcsafe.} = stats.echoStats() - if stats.checkIfAllMessagesReceived(): + if waitFor stats.checkIfAllMessagesReceived(): waitFor unsubscribe( wakuNode, remotePeer, conf.pubsubTopics[0], conf.contentTopics[0] ) diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/lightpush_publisher.nim index 6a0265364d..5264a8c638 100644 --- a/apps/liteprotocoltester/lightpush_publisher.nim +++ b/apps/liteprotocoltester/lightpush_publisher.nim @@ -66,6 +66,8 @@ proc prepareMessage( return (message, renderSize) +var sentMessages {.threadvar.}: OrderedTable[uint32, tuple[hash: string, relayed: bool]] + proc publishMessages( wakuNode: WakuNode, lightpushPubsubTopic: PubsubTopic, @@ -97,6 +99,7 @@ proc publishMessages( let msgHash = computeMessageHash(lightpushPubsubTopic, message).to0xHex if wlpRes.isOk(): + sentMessages[messagesSent] = (hash: msgHash, relayed: true) notice "published message using lightpush", index = messagesSent, count = numMessages, @@ -104,6 +107,7 @@ proc publishMessages( pubsubTopic = lightpushPubsubTopic, hash = msgHash else: + sentMessages[messagesSent] = (hash: msgHash, relayed: false) error "failed to publish message using lightpush", err = wlpRes.error, hash = msgHash inc(failedToSendCount) @@ -122,6 +126,12 @@ proc publishMessages( else: echo report.get() + echo "*--------------------------------------------------------------------------------------------------*" + echo "| Index | Relayed | Hash |" + for (index, info) in sentMessages.pairs: + echo fmt"|{index:>10}|{info.relayed:<9}| {info.hash}" + echo "*--------------------------------------------------------------------------------------------------*" + discard c_raise(ansi_c.SIGTERM) proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = @@ -145,6 +155,7 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = info "Start sending messages to service node using lightpush" + sentMessages.sort(system.cmp) # Start maintaining subscription asyncSpawn publishMessages( wakuNode, diff --git a/apps/liteprotocoltester/statistics.nim b/apps/liteprotocoltester/statistics.nim index 13e749193c..cb98be3686 100644 --- a/apps/liteprotocoltester/statistics.nim +++ b/apps/liteprotocoltester/statistics.nim @@ -4,11 +4,19 @@ import std/[sets, tables, strutils, sequtils, options, strformat], chronos/timer as chtimer, chronicles, + chronos, results import ./tester_message type + ArrivalInfo = object + arrivedAt: Moment + prevArrivedAt: Moment + prevIndex: uint32 + + MessageInfo = tuple[msg: ProtocolTesterMessage, info: ArrivalInfo] + StatHelper = object prevIndex: uint32 prevArrivedAt: Moment @@ -17,14 +25,13 @@ type maxIndex: uint32 Statistics* = object + received: Table[uint32, MessageInfo] + firstReceivedIdx*: uint32 allMessageCount*: uint32 receivedMessages*: uint32 misorderCount*: uint32 lateCount*: uint32 duplicateCount*: uint32 - minLatency*: Duration - maxLatency*: Duration - cummulativeLatency: Duration helper: StatHelper PerPeerStatistics* = Table[string, Statistics] @@ -42,24 +49,29 @@ proc init*(T: type Statistics, expectedMessageCount: int = 1000): T = result.helper.prevIndex = 0 result.helper.maxIndex = 0 result.helper.seenIndices.init(expectedMessageCount) - result.minLatency = nanos(0) - result.maxLatency = nanos(0) - result.cummulativeLatency = nanos(0) + result.received = initTable[uint32, MessageInfo](expectedMessageCount) return result proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) = if self.allMessageCount == 0: self.allMessageCount = msg.count + self.firstReceivedIdx = msg.index elif self.allMessageCount != msg.count: - warn "Message count mismatch at message", + error "Message count mismatch at message", index = msg.index, expected = self.allMessageCount, got = msg.count - if not self.helper.seenIndices.contains(msg.index): - self.helper.seenIndices.incl(msg.index) - else: - inc(self.duplicateCount) + let currentArrived: MessageInfo = ( + msg: msg, + info: ArrivalInfo( + arrivedAt: Moment.now(), + prevArrivedAt: self.helper.prevArrivedAt, + prevIndex: self.helper.prevIndex, + ), + ) + + if self.received.hasKeyOrPut(msg.index, currentArrived): warn "Duplicate message", index = msg.index - ## just do not count into stats + inc(self.duplicateCount) return ## detect misorder arrival and possible lost messages @@ -67,41 +79,13 @@ proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) = inc(self.misorderCount) warn "Misordered message arrival", index = msg.index, expected = self.helper.prevIndex + 1 - - ## collect possible lost message indicies - for idx in self.helper.prevIndex + 1 ..< msg.index: - self.helper.lostIndices.incl(idx) elif self.helper.prevIndex > msg.index: inc(self.lateCount) warn "Late message arrival", index = msg.index, expected = self.helper.prevIndex + 1 - else: - ## may remove late arrival - self.helper.lostIndices.excl(msg.index) - - ## calculate latency - let currentArrivedAt = Moment.now() - - let delaySincePrevArrived: Duration = currentArrivedAt - self.helper.prevArrivedAt - - let expectedDelay: Duration = nanos(msg.sincePrev) - - var latency: Duration - - # if we have any latency... - if expectedDelay > delaySincePrevArrived: - latency = delaySincePrevArrived - expectedDelay - if self.minLatency.isZero or (latency < self.minLatency and latency > nanos(0)): - self.minLatency = latency - if latency > self.maxLatency: - self.maxLatency = latency - self.cummulativeLatency += latency - else: - warn "Negative latency detected", - index = msg.index, expected = expectedDelay, actual = delaySincePrevArrived self.helper.maxIndex = max(self.helper.maxIndex, msg.index) self.helper.prevIndex = msg.index - self.helper.prevArrivedAt = currentArrivedAt + self.helper.prevArrivedAt = currentArrived.info.arrivedAt inc(self.receivedMessages) proc addMessage*( @@ -116,21 +100,61 @@ proc addMessage*( proc lossCount*(self: Statistics): uint32 = self.helper.maxIndex - self.receivedMessages -proc averageLatency*(self: Statistics): Duration = - if self.receivedMessages == 0: - return nanos(0) - return self.cummulativeLatency div self.receivedMessages +proc calcLatency*(self: Statistics): tuple[min, max, avg: Duration] = + var + minLatency = nanos(0) + maxLatency = nanos(0) + avgLatency = nanos(0) + + if self.receivedMessages > 2: + try: + var prevArrivedAt = self.received[self.firstReceivedIdx].info.arrivedAt + + for idx, (msg, arrival) in self.received.pairs: + if idx <= 1: + continue + let expectedDelay = nanos(msg.sincePrev) + + ## latency will be 0 if arrived in shorter time than expected + var latency = arrival.arrivedAt - arrival.prevArrivedAt - expectedDelay + + if latency > nanos(0): + if minLatency == nanos(0): + minLatency = latency + else: + minLatency = min(minLatency, latency) + + maxLatency = max(maxLatency, latency) + avgLatency += latency + + avgLatency = avgLatency div (self.receivedMessages - 1) + except KeyError: + error "Error while calculating latency" + + return (minLatency, maxLatency, avgLatency) + +proc missingIndices*(self: Statistics): seq[uint32] = + var missing: seq[uint32] = @[] + for idx in 1 .. self.helper.maxIndex: + if not self.received.hasKey(idx): + missing.add(idx) + return missing proc echoStat*(self: Statistics) = + let (minL, maxL, avgL) = self.calcLatency() + let printable = catch: """*------------------------------------------------------------------------------------------* | Expected | Received | Target | Loss | Misorder | Late | Duplicate | |{self.helper.maxIndex:>11} |{self.receivedMessages:>11} |{self.allMessageCount:>11} |{self.lossCount():>11} |{self.misorderCount:>11} |{self.lateCount:>11} |{self.duplicateCount:>11} | *------------------------------------------------------------------------------------------* | Latency stat: | -| avg latency: {$self.averageLatency():<73}| -| min latency: {$self.maxLatency:<73}| -| max latency: {$self.minLatency:<73}| +| min latency: {$minL:<73}| +| avg latency: {$avgL:<73}| +| max latency: {$maxL:<73}| +*------------------------------------------------------------------------------------------* +| Lost indices: | +| {self.missingIndices()} | *------------------------------------------------------------------------------------------*""".fmt() if printable.isErr(): @@ -139,6 +163,8 @@ proc echoStat*(self: Statistics) = echo printable.get() proc jsonStat*(self: Statistics): string = + let minL, maxL, avgL = self.calcLatency() + let json = catch: """{{"expected":{self.helper.maxIndex}, "received": {self.receivedMessages}, @@ -148,10 +174,11 @@ proc jsonStat*(self: Statistics): string = "late": {self.lateCount}, "duplicate": {self.duplicateCount}, "latency": - {{"avg": "{self.averageLatency()}", - "min": "{self.minLatency}", - "max": "{self.maxLatency}" - }} + {{"avg": "{avgL}", + "min": "{minL}", + "max": "{maxL}" + }}, + "lostIndices": {self.missingIndices()} }}""".fmt() if json.isErr: return "{\"result:\": \"" & json.error.msg & "\"}" @@ -189,14 +216,25 @@ proc jsonStats*(self: PerPeerStatistics): string = "{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() & "\"}" -proc checkIfAllMessagesReceived*(self: PerPeerStatistics): bool = +proc checkIfAllMessagesReceived*(self: PerPeerStatistics): Future[bool] {.async.} = # if there are no peers have sent messages, assume we just have started. if self.len == 0: return false for stat in self.values: if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or - stat.receivedMessages < stat.allMessageCount: + stat.helper.maxIndex < stat.allMessageCount: return false + ## Ok, we see last message arrived from all peers, + ## lets check if all messages are received + ## and if not let's wait another 20 secs to give chance the system will send them. + var shallWait = false + for stat in self.values: + if stat.receivedMessages < stat.allMessageCount: + shallWait = true + + if shallWait: + await sleepAsync(chtimer.seconds(20)) + return true