Skip to content

Commit

Permalink
Enhancement on statistics reports, added list of sent messages with h…
Browse files Browse the repository at this point in the history
…ash, fixed latency calculations
  • Loading branch information
NagyZoltanPeter committed Aug 2, 2024
1 parent 78ed6a7 commit 9657455
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 60 deletions.
8 changes: 4 additions & 4 deletions apps/liteprotocoltester/.env
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
NWAKU_IMAGE=quay.io/wakuorg/nwaku-pr:2800-rln-v2

START_PUBLISHING_AFTER=30 # seconds
NUM_MESSAGES=250
DELAY_MESSAGES=200 # gap between messages
NUM_MESSAGES=10000
DELAY_MESSAGES=168 # gap between messages

MIN_MESSAGE_SIZE=1Kb
MAX_MESSAGE_SIZE=120Kb
MIN_MESSAGE_SIZE=15Kb
MAX_MESSAGE_SIZE=145Kb

# PUBSUB=/waku/2/rs/66/0
# CONTENT_TOPIC=/tester/1/light-pubsub-example/proto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
ARG NIMFLAGS
ARG MAKE_TARGET=liteprotocoltester
ARG NIM_COMMIT
ARG LOG_LEVEL=TRACE
ARG LOG_LEVEL=DEBUG

# Get build tools and required header files
RUN apk add --no-cache bash git build-base pcre-dev linux-headers curl jq
Expand Down
2 changes: 1 addition & 1 deletion apps/liteprotocoltester/filter_subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
proc(udata: pointer) {.gcsafe.} =
stats.echoStats()

if stats.checkIfAllMessagesReceived():
if waitFor stats.checkIfAllMessagesReceived():
waitFor unsubscribe(
wakuNode, remotePeer, conf.pubsubTopics[0], conf.contentTopics[0]
)
Expand Down
11 changes: 11 additions & 0 deletions apps/liteprotocoltester/lightpush_publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ proc prepareMessage(

return (message, renderSize)

var sentMessages {.threadvar.}: OrderedTable[uint32, tuple[hash: string, relayed: bool]]

proc publishMessages(
wakuNode: WakuNode,
lightpushPubsubTopic: PubsubTopic,
Expand Down Expand Up @@ -97,13 +99,15 @@ proc publishMessages(
let msgHash = computeMessageHash(lightpushPubsubTopic, message).to0xHex

if wlpRes.isOk():
sentMessages[messagesSent] = (hash: msgHash, relayed: true)
notice "published message using lightpush",
index = messagesSent,
count = numMessages,
size = msgSize,
pubsubTopic = lightpushPubsubTopic,
hash = msgHash
else:
sentMessages[messagesSent] = (hash: msgHash, relayed: false)
error "failed to publish message using lightpush",
err = wlpRes.error, hash = msgHash
inc(failedToSendCount)
Expand All @@ -122,6 +126,12 @@ proc publishMessages(
else:
echo report.get()

echo "*--------------------------------------------------------------------------------------------------*"
echo "| Index | Relayed | Hash |"
for (index, info) in sentMessages.pairs:
echo fmt"|{index:>10}|{info.relayed:<9}| {info.hash}"
echo "*--------------------------------------------------------------------------------------------------*"

discard c_raise(ansi_c.SIGTERM)

proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
Expand All @@ -145,6 +155,7 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =

info "Start sending messages to service node using lightpush"

sentMessages.sort(system.cmp)
# Start maintaining subscription
asyncSpawn publishMessages(
wakuNode,
Expand Down
146 changes: 92 additions & 54 deletions apps/liteprotocoltester/statistics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@ import
std/[sets, tables, strutils, sequtils, options, strformat],
chronos/timer as chtimer,
chronicles,
chronos,
results

import ./tester_message

type
ArrivalInfo = object
arrivedAt: Moment
prevArrivedAt: Moment
prevIndex: uint32

MessageInfo = tuple[msg: ProtocolTesterMessage, info: ArrivalInfo]

StatHelper = object
prevIndex: uint32
prevArrivedAt: Moment
Expand All @@ -17,14 +25,13 @@ type
maxIndex: uint32

Statistics* = object
received: Table[uint32, MessageInfo]
firstReceivedIdx*: uint32
allMessageCount*: uint32
receivedMessages*: uint32
misorderCount*: uint32
lateCount*: uint32
duplicateCount*: uint32
minLatency*: Duration
maxLatency*: Duration
cummulativeLatency: Duration
helper: StatHelper

PerPeerStatistics* = Table[string, Statistics]
Expand All @@ -42,66 +49,43 @@ proc init*(T: type Statistics, expectedMessageCount: int = 1000): T =
result.helper.prevIndex = 0
result.helper.maxIndex = 0
result.helper.seenIndices.init(expectedMessageCount)
result.minLatency = nanos(0)
result.maxLatency = nanos(0)
result.cummulativeLatency = nanos(0)
result.received = initTable[uint32, MessageInfo](expectedMessageCount)
return result

proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) =
if self.allMessageCount == 0:
self.allMessageCount = msg.count
self.firstReceivedIdx = msg.index
elif self.allMessageCount != msg.count:
warn "Message count mismatch at message",
error "Message count mismatch at message",
index = msg.index, expected = self.allMessageCount, got = msg.count

if not self.helper.seenIndices.contains(msg.index):
self.helper.seenIndices.incl(msg.index)
else:
inc(self.duplicateCount)
let currentArrived: MessageInfo = (
msg: msg,
info: ArrivalInfo(
arrivedAt: Moment.now(),
prevArrivedAt: self.helper.prevArrivedAt,
prevIndex: self.helper.prevIndex,
),
)

if self.received.hasKeyOrPut(msg.index, currentArrived):
warn "Duplicate message", index = msg.index
## just do not count into stats
inc(self.duplicateCount)
return

## detect misorder arrival and possible lost messages
if self.helper.prevIndex + 1 < msg.index:
inc(self.misorderCount)
warn "Misordered message arrival",
index = msg.index, expected = self.helper.prevIndex + 1

## collect possible lost message indicies
for idx in self.helper.prevIndex + 1 ..< msg.index:
self.helper.lostIndices.incl(idx)
elif self.helper.prevIndex > msg.index:
inc(self.lateCount)
warn "Late message arrival", index = msg.index, expected = self.helper.prevIndex + 1
else:
## may remove late arrival
self.helper.lostIndices.excl(msg.index)

## calculate latency
let currentArrivedAt = Moment.now()

let delaySincePrevArrived: Duration = currentArrivedAt - self.helper.prevArrivedAt

let expectedDelay: Duration = nanos(msg.sincePrev)

var latency: Duration

# if we have any latency...
if expectedDelay > delaySincePrevArrived:
latency = delaySincePrevArrived - expectedDelay
if self.minLatency.isZero or (latency < self.minLatency and latency > nanos(0)):
self.minLatency = latency
if latency > self.maxLatency:
self.maxLatency = latency
self.cummulativeLatency += latency
else:
warn "Negative latency detected",
index = msg.index, expected = expectedDelay, actual = delaySincePrevArrived

self.helper.maxIndex = max(self.helper.maxIndex, msg.index)
self.helper.prevIndex = msg.index
self.helper.prevArrivedAt = currentArrivedAt
self.helper.prevArrivedAt = currentArrived.info.arrivedAt
inc(self.receivedMessages)

proc addMessage*(
Expand All @@ -116,21 +100,61 @@ proc addMessage*(
proc lossCount*(self: Statistics): uint32 =
self.helper.maxIndex - self.receivedMessages

proc averageLatency*(self: Statistics): Duration =
if self.receivedMessages == 0:
return nanos(0)
return self.cummulativeLatency div self.receivedMessages
proc calcLatency*(self: Statistics): tuple[min, max, avg: Duration] =
var
minLatency = nanos(0)
maxLatency = nanos(0)
avgLatency = nanos(0)

if self.receivedMessages > 2:
try:
var prevArrivedAt = self.received[self.firstReceivedIdx].info.arrivedAt

for idx, (msg, arrival) in self.received.pairs:
if idx <= 1:
continue
let expectedDelay = nanos(msg.sincePrev)

## latency will be 0 if arrived in shorter time than expected
var latency = arrival.arrivedAt - arrival.prevArrivedAt - expectedDelay

if latency > nanos(0):
if minLatency == nanos(0):
minLatency = latency
else:
minLatency = min(minLatency, latency)

maxLatency = max(maxLatency, latency)
avgLatency += latency

avgLatency = avgLatency div (self.receivedMessages - 1)
except KeyError:
error "Error while calculating latency"

return (minLatency, maxLatency, avgLatency)

proc missingIndices*(self: Statistics): seq[uint32] =
var missing: seq[uint32] = @[]
for idx in 1 .. self.helper.maxIndex:
if not self.received.hasKey(idx):
missing.add(idx)
return missing

proc echoStat*(self: Statistics) =
let (minL, maxL, avgL) = self.calcLatency()

let printable = catch:
"""*------------------------------------------------------------------------------------------*
| Expected | Received | Target | Loss | Misorder | Late | Duplicate |
|{self.helper.maxIndex:>11} |{self.receivedMessages:>11} |{self.allMessageCount:>11} |{self.lossCount():>11} |{self.misorderCount:>11} |{self.lateCount:>11} |{self.duplicateCount:>11} |
*------------------------------------------------------------------------------------------*
| Latency stat: |
| avg latency: {$self.averageLatency():<73}|
| min latency: {$self.maxLatency:<73}|
| max latency: {$self.minLatency:<73}|
| min latency: {$minL:<73}|
| avg latency: {$avgL:<73}|
| max latency: {$maxL:<73}|
*------------------------------------------------------------------------------------------*
| Lost indices: |
| {self.missingIndices()} |
*------------------------------------------------------------------------------------------*""".fmt()

if printable.isErr():
Expand All @@ -139,6 +163,8 @@ proc echoStat*(self: Statistics) =
echo printable.get()

proc jsonStat*(self: Statistics): string =
let minL, maxL, avgL = self.calcLatency()

let json = catch:
"""{{"expected":{self.helper.maxIndex},
"received": {self.receivedMessages},
Expand All @@ -148,10 +174,11 @@ proc jsonStat*(self: Statistics): string =
"late": {self.lateCount},
"duplicate": {self.duplicateCount},
"latency":
{{"avg": "{self.averageLatency()}",
"min": "{self.minLatency}",
"max": "{self.maxLatency}"
}}
{{"avg": "{avgL}",
"min": "{minL}",
"max": "{maxL}"
}},
"lostIndices": {self.missingIndices()}
}}""".fmt()
if json.isErr:
return "{\"result:\": \"" & json.error.msg & "\"}"
Expand Down Expand Up @@ -189,14 +216,25 @@ proc jsonStats*(self: PerPeerStatistics): string =
"{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() &
"\"}"

proc checkIfAllMessagesReceived*(self: PerPeerStatistics): bool =
proc checkIfAllMessagesReceived*(self: PerPeerStatistics): Future[bool] {.async.} =
# if there are no peers have sent messages, assume we just have started.
if self.len == 0:
return false

for stat in self.values:
if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or
stat.receivedMessages < stat.allMessageCount:
stat.helper.maxIndex < stat.allMessageCount:
return false

## Ok, we see last message arrived from all peers,
## lets check if all messages are received
## and if not let's wait another 20 secs to give chance the system will send them.
var shallWait = false
for stat in self.values:
if stat.receivedMessages < stat.allMessageCount:
shallWait = true

if shallWait:
await sleepAsync(chtimer.seconds(20))

return true

0 comments on commit 9657455

Please sign in to comment.