Skip to content

Commit

Permalink
Fix reset latency
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Nov 25, 2024
1 parent e8e7420 commit d840587
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
2 changes: 1 addition & 1 deletion examples/tls/getting_started_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func main() {
streamName,
handleMessages,
stream.NewConsumerOptions().
SetConsumerName("my_consumer"). // set a consumer name
SetConsumerName("my_consumer"). // set a consumer name
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
CheckErr(err)
channelClose := consumer.NotifyClose()
Expand Down
17 changes: 8 additions & 9 deletions perfTest/cmd/silent.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,15 @@ func printStats() {
select {
case _ = <-ticker.C:
v := time.Now().Sub(start).Milliseconds()

PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000
CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000
//latency := float64(totalLatency) / float64(atomic.LoadInt32(&consumerMessageCount))
averageLatency := int64(0)
if atomic.LoadInt32(&consumerMessageCount) > 0 {
PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000
CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000
averageLatency = totalLatency / int64(atomic.LoadInt32(&consumerMessageCount))
ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000
logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms",
PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency)
}

ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000
logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms",
PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency)
}
}

Expand All @@ -100,6 +97,7 @@ func printStats() {

atomic.SwapInt32(&publisherMessageCount, 0)
atomic.SwapInt32(&consumerMessageCount, 0)
atomic.SwapInt64(&totalLatency, 0)
atomic.SwapInt32(&confirmedMessageCount, 0)
atomic.SwapInt32(&notConfirmedMessageCount, 0)
start = time.Now()
Expand Down Expand Up @@ -159,8 +157,9 @@ func startSimulation() error {
err := initStreams()
checkErr(err)

//
simulEnvironment, err = stream.NewEnvironment(stream.NewEnvironmentOptions().
SetUris(rabbitmqBrokerUrl).
SetUri(rabbitmqBrokerUrl[0]).
SetMaxProducersPerClient(publishersPerClient).
SetMaxConsumersPerClient(consumersPerClient))
checkErr(err)
Expand Down

0 comments on commit d840587

Please sign in to comment.