From 5fbabec86affe07570685a92677ec3d0f40d954c Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 1 Nov 2023 10:36:10 +0000 Subject: [PATCH 01/16] adding logs Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 2378835..d82696c 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -774,10 +774,13 @@ func (pr *perfRunner) runLoop(tc TestCase) error { pr.stopTrackingRequest(nextTrackingID) } } - log.Infof("%d <-- %s Finished (loop=%d)", workerID, testName, loop) + secondsPerLoop := time.Since(startTime).Seconds() + log.Infof("%d <-- %s Finished (loop=%d) after %f seconds", workerID, testName, loop, secondsPerLoop) if histErr == nil { - hist.Observe(time.Since(startTime).Seconds()) + log.Infof("%d <-- %s Emmiting (loop=%d) after %f seconds", workerID, testName, loop, secondsPerLoop) + + hist.Observe(secondsPerLoop) } loop++ From 519f506bcf51725959ec8c8fccdb25575c1ecca0 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 1 Nov 2023 10:57:53 +0000 Subject: [PATCH 02/16] parallel submission Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index d82696c..45a3f81 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -719,6 +719,14 @@ func (pr *perfRunner) runLoop(tc TestCase) error { } startTime := time.Now() + + type ActionResponse struct { + trackingID string + err error + } + + actionResponses := make(chan *ActionResponse, tc.ActionsPerLoop()) + trackingIDs := make([]string, 0) for actionsCompleted = 0; actionsCompleted < tc.ActionsPerLoop(); actionsCompleted++ { @@ -729,22 +737,32 @@ func (pr *perfRunner) runLoop(tc TestCase) error { trackingID, err := tc.RunOnce() - if err != nil { + actionResponses <- &ActionResponse{ + trackingID: trackingID, + err: err, + } + } + resultCount := 0 + for { + aResponse := <-actionResponses + resultCount++ + if aResponse.err != nil { if pr.cfg.DelinquentAction == conf.DelinquentActionExit.String() { - return err + return aResponse.err } else { - log.Errorf("Worker %d error running job (logging but continuing): %s", workerID, err) - err = nil - continue + log.Errorf("Worker %d error running job (logging but continuing): %s", workerID, aResponse.err) } } else { - trackingIDs = append(trackingIDs, trackingID) - pr.markTestInFlight(tc, trackingID) - log.Infof("%d --> %s Sent %s: %s", workerID, testName, idType, trackingID) + trackingIDs = append(trackingIDs, aResponse.trackingID) + pr.markTestInFlight(tc, aResponse.trackingID) + log.Infof("%d --> %s Sent %s: %s", workerID, testName, idType, aResponse.trackingID) totalActionsCounter.Inc() } + // if we've reached the expected amount of metadata calls then stop + if resultCount == tc.ActionsPerLoop() { + break + } } - if testName == conf.PerfTestTokenMint.String() && pr.cfg.SkipMintConfirmations { // For minting tests a worker can (if configured) skip waiting for a matching response event // before making itself available for the next job From e5dc115300cbf131bc9806e87de0f49b92fd7c78 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 1 Nov 2023 11:02:11 +0000 Subject: [PATCH 03/16] more logging Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 45a3f81..6a3e373 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -760,6 +760,7 @@ func (pr *perfRunner) runLoop(tc TestCase) error { } // if we've reached the expected amount of metadata calls then stop if resultCount == tc.ActionsPerLoop() { + log.Infof("%d --> %s All actions sent %d", workerID, testName, resultCount) break } } From 20a7a4917a7f5e48dce2997213e0151e97b7c4d8 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 1 Nov 2023 11:07:40 +0000 Subject: [PATCH 04/16] more logs Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 6a3e373..29b268f 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -760,7 +760,7 @@ func (pr *perfRunner) runLoop(tc TestCase) error { } // if we've reached the expected amount of metadata calls then stop if resultCount == tc.ActionsPerLoop() { - log.Infof("%d --> %s All actions sent %d", workerID, testName, resultCount) + log.Infof("%d --> %s All actions sent %d after %f seconds", workerID, testName, resultCount, time.Since(startTime).Seconds()) break } } From c34f96751bcf189c9b567f64acb6f1b3c58a8401 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 1 Nov 2023 11:12:46 +0000 Subject: [PATCH 05/16] more logs Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 29b268f..446411c 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -736,7 +736,7 @@ func (pr *perfRunner) runLoop(tc TestCase) error { } trackingID, err := tc.RunOnce() - + log.Infof("%d --> %s action %d sent after %f seconds", workerID, testName, actionsCompleted, time.Since(startTime).Seconds()) actionResponses <- &ActionResponse{ trackingID: trackingID, err: err, From 2585b87cd246bc85fb46884f00d52b1834dca13f Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 1 Nov 2023 11:13:55 +0000 Subject: [PATCH 06/16] parallel Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 446411c..0bb11a7 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -734,13 +734,15 @@ func (pr *perfRunner) runLoop(tc TestCase) error { if pr.allActionsComplete() { break } - - trackingID, err := tc.RunOnce() - log.Infof("%d --> %s action %d sent after %f seconds", workerID, testName, actionsCompleted, time.Since(startTime).Seconds()) - actionResponses <- &ActionResponse{ - trackingID: trackingID, - err: err, - } + actionCount := actionsCompleted + go func() { + trackingID, err := tc.RunOnce() + log.Infof("%d --> %s action %d sent after %f seconds", workerID, testName, actionCount, time.Since(startTime).Seconds()) + actionResponses <- &ActionResponse{ + trackingID: trackingID, + err: err, + } + }() } resultCount := 0 for { From 0ef9982b1f7f3ea8a7896bd3e7f302bbf3e367c7 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 1 Nov 2023 11:17:19 +0000 Subject: [PATCH 07/16] fixing iteration count Signed-off-by: Chengxuan Xing --- internal/perf/blob_broadcast_msg.go | 2 +- internal/perf/blob_private_msg.go | 2 +- internal/perf/broadcast_msg.go | 2 +- internal/perf/custom_ethereum_contract.go | 6 ++---- internal/perf/custom_fabric_contract.go | 6 ++---- internal/perf/perf.go | 4 ++-- internal/perf/private_msg.go | 2 +- internal/perf/token_mint.go | 2 +- 8 files changed, 11 insertions(+), 15 deletions(-) diff --git a/internal/perf/blob_broadcast_msg.go b/internal/perf/blob_broadcast_msg.go index ca41f1e..ebda500 100644 --- a/internal/perf/blob_broadcast_msg.go +++ b/internal/perf/blob_broadcast_msg.go @@ -33,7 +33,7 @@ func (tc *blobBroadcast) IDType() TrackingIDType { return TrackingIDTypeMessageID } -func (tc *blobBroadcast) RunOnce() (string, error) { +func (tc *blobBroadcast) RunOnce(iterationCount int) (string, error) { blob, hash := tc.generateBlob(big.NewInt(1024)) dataID, err := tc.uploadBlob(blob, hash, tc.pr.client.BaseURL) diff --git a/internal/perf/blob_private_msg.go b/internal/perf/blob_private_msg.go index 7d7e788..1c0d735 100644 --- a/internal/perf/blob_private_msg.go +++ b/internal/perf/blob_private_msg.go @@ -33,7 +33,7 @@ func (tc *blobPrivate) IDType() TrackingIDType { return TrackingIDTypeMessageID } -func (tc *blobPrivate) RunOnce() (string, error) { +func (tc *blobPrivate) RunOnce(iterationCount int) (string, error) { blob, hash := tc.generateBlob(big.NewInt(1024)) dataID, err := tc.uploadBlob(blob, hash, tc.pr.client.BaseURL) diff --git a/internal/perf/broadcast_msg.go b/internal/perf/broadcast_msg.go index 8e8b1c6..3656d03 100644 --- a/internal/perf/broadcast_msg.go +++ b/internal/perf/broadcast_msg.go @@ -32,7 +32,7 @@ func (tc *broadcast) IDType() TrackingIDType { return TrackingIDTypeMessageID } -func (tc *broadcast) RunOnce() (string, error) { +func (tc *broadcast) RunOnce(iterationCount int) (string, error) { payload := fmt.Sprintf(`{ "data":[ diff --git a/internal/perf/custom_ethereum_contract.go b/internal/perf/custom_ethereum_contract.go index a945437..b255bb4 100644 --- a/internal/perf/custom_ethereum_contract.go +++ b/internal/perf/custom_ethereum_contract.go @@ -30,7 +30,6 @@ import ( type customEthereum struct { testBase - iteration int } func newCustomEthereumTestWorker(pr *perfRunner, workerID int, actionsPerLoop int) TestCase { @@ -51,8 +50,8 @@ func (tc *customEthereum) IDType() TrackingIDType { return TrackingIDTypeWorkerNumber } -func (tc *customEthereum) RunOnce() (string, error) { - idempotencyKey := tc.pr.getIdempotencyKey(tc.workerID, tc.iteration) +func (tc *customEthereum) RunOnce(iterationCount int) (string, error) { + idempotencyKey := tc.pr.getIdempotencyKey(tc.workerID, iterationCount) invokeOptionsJSON := "" if tc.pr.cfg.InvokeOptions != nil { b, err := json.Marshal(tc.pr.cfg.InvokeOptions) @@ -106,6 +105,5 @@ func (tc *customEthereum) RunOnce() (string, error) { return "", fmt.Errorf("Error invoking contract [%d]: %s (%+v)", resStatus(res), err, &resError) } } - tc.iteration++ return strconv.Itoa(tc.workerID), nil } diff --git a/internal/perf/custom_fabric_contract.go b/internal/perf/custom_fabric_contract.go index a0e2dc4..3b2ea23 100644 --- a/internal/perf/custom_fabric_contract.go +++ b/internal/perf/custom_fabric_contract.go @@ -29,7 +29,6 @@ import ( type customFabric struct { testBase - iteration int } func newCustomFabricTestWorker(pr *perfRunner, workerID int, actionsPerLoop int) TestCase { @@ -50,8 +49,8 @@ func (tc *customFabric) IDType() TrackingIDType { return TrackingIDTypeWorkerNumber } -func (tc *customFabric) RunOnce() (string, error) { - idempotencyKey := tc.pr.getIdempotencyKey(tc.workerID, tc.iteration) +func (tc *customFabric) RunOnce(iterationCount int) (string, error) { + idempotencyKey := tc.pr.getIdempotencyKey(tc.workerID, iterationCount) invokeOptionsJSON := "" if tc.pr.cfg.InvokeOptions != nil { b, err := json.Marshal(tc.pr.cfg.InvokeOptions) @@ -143,6 +142,5 @@ func (tc *customFabric) RunOnce() (string, error) { if err != nil || res.IsError() { return "", fmt.Errorf("Error invoking contract [%d]: %s (%+v)", resStatus(res), err, &resError) } - tc.iteration++ return strconv.Itoa(tc.workerID), nil } diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 0bb11a7..d41a36b 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -141,7 +141,7 @@ const ( type TestCase interface { WorkerID() int - RunOnce() (trackingID string, err error) + RunOnce(iterationCount int) (trackingID string, err error) IDType() TrackingIDType Name() string ActionsPerLoop() int @@ -736,7 +736,7 @@ func (pr *perfRunner) runLoop(tc TestCase) error { } actionCount := actionsCompleted go func() { - trackingID, err := tc.RunOnce() + trackingID, err := tc.RunOnce(actionCount) log.Infof("%d --> %s action %d sent after %f seconds", workerID, testName, actionCount, time.Since(startTime).Seconds()) actionResponses <- &ActionResponse{ trackingID: trackingID, diff --git a/internal/perf/private_msg.go b/internal/perf/private_msg.go index 3f1646a..838fce6 100644 --- a/internal/perf/private_msg.go +++ b/internal/perf/private_msg.go @@ -32,7 +32,7 @@ func (tc *private) IDType() TrackingIDType { return TrackingIDTypeMessageID } -func (tc *private) RunOnce() (string, error) { +func (tc *private) RunOnce(iterationCount int) (string, error) { payload := fmt.Sprintf(`{ "data": [ diff --git a/internal/perf/token_mint.go b/internal/perf/token_mint.go index 64ccb48..66ba563 100644 --- a/internal/perf/token_mint.go +++ b/internal/perf/token_mint.go @@ -60,7 +60,7 @@ func (tc *tokenMint) GetSigningKey() string { return "" } -func (tc *tokenMint) RunOnce() (string, error) { +func (tc *tokenMint) RunOnce(iterationCount int) (string, error) { var payload string mintAmount := 10 if tc.pr.cfg.TokenOptions.TokenType == core.TokenTypeNonFungible.String() { From aa845c412761490af3551e6af416da88c5b3d79e Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 1 Nov 2023 11:21:57 +0000 Subject: [PATCH 08/16] more unique idem key Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index d41a36b..595816c 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -1296,7 +1296,7 @@ func (pr *perfRunner) getIdempotencyKey(workerId int, iteration int) string { workerIdStr := fmt.Sprintf("%05d", workerId) // Left pad iteration ID to 9 digits (supporting up to 999,999,999 iterations) iterationIdStr := fmt.Sprintf("%09d", iteration) - return fmt.Sprintf("%v-%s-%s", pr.startTime, workerIdStr, iterationIdStr) + return fmt.Sprintf("%v-%s-%s-%s", pr.startTime, workerIdStr, iterationIdStr, fftypes.NewUUID()) } func (pr *perfRunner) calculateCurrentTps(logValue bool) float64 { From 79bc909dd407f6d05f039c458c039032378705f8 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 1 Nov 2023 15:05:26 +0000 Subject: [PATCH 09/16] more logs Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 2378835..7808113 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -719,6 +719,9 @@ func (pr *perfRunner) runLoop(tc TestCase) error { } startTime := time.Now() + var sentTime time.Time + var submissionSeconds float64 + var eventReceivingSeconds float64 trackingIDs := make([]string, 0) for actionsCompleted = 0; actionsCompleted < tc.ActionsPerLoop(); actionsCompleted++ { @@ -728,7 +731,8 @@ func (pr *perfRunner) runLoop(tc TestCase) error { } trackingID, err := tc.RunOnce() - + submissionSeconds = time.Since(startTime).Seconds() + sentTime = time.Now() if err != nil { if pr.cfg.DelinquentAction == conf.DelinquentActionExit.String() { return err @@ -774,7 +778,11 @@ func (pr *perfRunner) runLoop(tc TestCase) error { pr.stopTrackingRequest(nextTrackingID) } } - log.Infof("%d <-- %s Finished (loop=%d)", workerID, testName, loop) + eventReceivingSeconds = time.Since(sentTime).Seconds() + total := submissionSeconds + eventReceivingSeconds + subPortion := int((submissionSeconds / total) * 100) + envPortion := int((eventReceivingSeconds / total) * 100) + log.Infof("%d <-- %s Finished (loop=%d), submission time: %f s, event receive time: %f s. Ratio (%d/%d)", workerID, testName, loop, submissionSeconds, eventReceivingSeconds, subPortion, envPortion) if histErr == nil { hist.Observe(time.Since(startTime).Seconds()) From d0e5d5d28a7e7d873c564100e6aab9da89865aa4 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 8 Nov 2023 14:55:34 +0000 Subject: [PATCH 10/16] adding latency metrics Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 67 ++++++++++++++++++++++++--------- internal/util/report_builder.go | 62 ++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 17 deletions(-) create mode 100644 internal/util/report_builder.go diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 767f28c..678250f 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -36,6 +36,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/wsclient" "github.com/hyperledger/firefly-perf-cli/internal/conf" + "github.com/hyperledger/firefly-perf-cli/internal/util" "github.com/hyperledger/firefly/pkg/core" dto "github.com/prometheus/client_model/go" log "github.com/sirupsen/logrus" @@ -155,16 +156,22 @@ type inflightTest struct { var mintStartingBalance int type perfRunner struct { - bfr chan int - cfg *conf.RunnerConfig - client *resty.Client - ctx context.Context - shutdown context.CancelFunc - stopping bool - startTime int64 - endTime int64 - startRampTime int64 - endRampTime int64 + bfr chan int + cfg *conf.RunnerConfig + client *resty.Client + ctx context.Context + shutdown context.CancelFunc + stopping bool + startTime int64 + endSendTime int64 + endTime int64 + startRampTime int64 + endRampTime int64 + + sendTime *util.Latency + receiveTime *util.Latency + totalTime *util.Latency + msgTimeMap map[string]*inflightTest rampSummary int64 totalSummary int64 @@ -227,6 +234,9 @@ func New(config *conf.RunnerConfig) PerfRunner { startTime: startTime, endTime: endTime, poolName: poolName, + sendTime: &util.Latency{}, + receiveTime: &util.Latency{}, + totalTime: &util.Latency{}, poolConnectorName: config.TokenOptions.TokenPoolConnectorName, tagPrefix: fmt.Sprintf("perf_%s", wsUUID.String()), msgTimeMap: make(map[string]*inflightTest), @@ -505,6 +515,7 @@ perfLoop: measuredActions := pr.totalSummary measuredTime := time.Since(time.Unix(pr.startTime, 0)).Seconds() measuredTps := pr.calculateCurrentTps(true) + measuredSendTps := pr.calculateSendTps() // we sleep on shutdown / completion to allow for Prometheus metrics to be scraped one final time // After 30 seconds workers should be completed, so we check for delinquent messages @@ -527,7 +538,11 @@ perfLoop: log.Infof(" - Prometheus metric actions_submitted_total = %f\n", getMetricVal(totalActionsCounter)) log.Infof(" - Test duration (secs): %2f", measuredTime) log.Infof(" - Measured actions: %d", measuredActions) - log.Infof(" - Measured actions/sec: %2f", measuredTps) + log.Infof(" - Measured send TPS: %2f", measuredSendTps) + log.Infof(" - Measured throughput: %2f", measuredTps) + log.Infof(" - Measured send duration: %s", pr.sendTime) + log.Infof(" - Measured event receiving duration: %s", pr.receiveTime) + log.Infof(" - Measured total duration: %s", pr.totalTime) return nil } @@ -765,9 +780,13 @@ func (pr *perfRunner) runLoop(tc TestCase) error { } // if we've reached the expected amount of metadata calls then stop if resultCount == tc.ActionsPerLoop() { - submissionSecondsPerLoop = time.Since(startTime).Seconds() + submissionDurationPerLoop := time.Since(startTime) + pr.sendTime.Record(submissionDurationPerLoop) + submissionSecondsPerLoop = submissionDurationPerLoop.Seconds() sentTime = time.Now() log.Infof("%d --> %s All actions sent %d after %f seconds", workerID, testName, resultCount, submissionSecondsPerLoop) + + pr.endSendTime = time.Now().Unix() break } } @@ -800,8 +819,14 @@ func (pr *perfRunner) runLoop(tc TestCase) error { pr.stopTrackingRequest(nextTrackingID) } } - secondsPerLoop := time.Since(startTime).Seconds() - eventReceivingSecondsPerLoop = time.Since(sentTime).Seconds() + totalDurationPerLoop := time.Since(startTime) + pr.totalTime.Record(totalDurationPerLoop) + secondsPerLoop := totalDurationPerLoop.Seconds() + + eventReceivingDurationPerLoop := time.Since(sentTime) + eventReceivingSecondsPerLoop = eventReceivingDurationPerLoop.Seconds() + pr.receiveTime.Record(totalDurationPerLoop) + total := submissionSecondsPerLoop + eventReceivingSecondsPerLoop subPortion := int((submissionSecondsPerLoop / total) * 100) envPortion := int((eventReceivingSecondsPerLoop / total) * 100) @@ -1015,13 +1040,13 @@ func (pr *perfRunner) detectDelinquentBalance() bool { func (pr *perfRunner) markTestInFlight(tc TestCase, trackingID string) { mutex.Lock() + defer mutex.Unlock() if len(trackingID) > 0 { pr.msgTimeMap[trackingID] = &inflightTest{ testCase: tc, time: time.Now(), } } - mutex.Unlock() } func (pr *perfRunner) recordCompletedAction() { @@ -1031,14 +1056,14 @@ func (pr *perfRunner) recordCompletedAction() { pr.totalSummary++ } mutex.Lock() - mutex.Unlock() + defer mutex.Unlock() pr.calculateCurrentTps(true) } func (pr *perfRunner) stopTrackingRequest(trackingID string) { mutex.Lock() + defer mutex.Unlock() delete(pr.msgTimeMap, trackingID) - mutex.Unlock() } func (pr *perfRunner) createEthereumContractListener(nodeURL string) (string, error) { @@ -1327,6 +1352,14 @@ func (pr *perfRunner) calculateCurrentTps(logValue bool) float64 { } return currentTps } +func (pr *perfRunner) calculateSendTps() float64 { + measuredActions := pr.totalSummary + sendDuration := time.Duration((pr.endSendTime - pr.startTime) * int64(time.Second)) + durationSec := sendDuration.Seconds() + sendTps := float64(measuredActions) / durationSec + log.Infof("Send TPS: %v Measured Actions: %v Duration: %v", sendTps, measuredActions, durationSec) + return sendTps +} func (pr *perfRunner) ramping() bool { if time.Now().Before(time.Unix(pr.endRampTime, 0)) { diff --git a/internal/util/report_builder.go b/internal/util/report_builder.go new file mode 100644 index 0000000..bc6c0de --- /dev/null +++ b/internal/util/report_builder.go @@ -0,0 +1,62 @@ +package util + +import ( + "fmt" + "sync" + "time" +) + +type Summary struct { + Latency *Latency `json:"latency"` + TPS *TPS `json:"tps"` + ResultCount *TPS `json:"resultCount"` +} + +type SystemUnderTest struct { +} + +type TPS struct { + SendRate float64 `json:"sendRate"` + Throughput float64 `json:"throughput"` +} + +type ResultCount struct { + TotalCount int64 `json:"totalCount"` + RetryCount int64 `json:"retryCount"` +} + +type Latency struct { + mux sync.Mutex + min time.Duration + max time.Duration + total int64 + count int64 +} + +func (lt *Latency) Record(latency time.Duration) { + lt.mux.Lock() + defer lt.mux.Unlock() + if latency < lt.min { + lt.min = latency + } + if latency > lt.max { + lt.max = latency + } + lt.total += latency.Milliseconds() + lt.count++ +} + +func (lt *Latency) Avg() time.Duration { + return time.Duration((lt.total / lt.count) * int64(time.Millisecond)) +} + +func (lt *Latency) Min() time.Duration { + return lt.min +} + +func (lt *Latency) Max() time.Duration { + return lt.max +} +func (lt *Latency) String() string { + return fmt.Sprintf("min: %s, max: %s, avg: %s", lt.Min(), lt.Max(), lt.Avg()) +} From 74e8c47602b06d9395152222ae36ac062ae30533 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 8 Nov 2023 15:37:27 +0000 Subject: [PATCH 11/16] less noisy Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 678250f..697b353 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -775,7 +775,7 @@ func (pr *perfRunner) runLoop(tc TestCase) error { } else { trackingIDs = append(trackingIDs, aResponse.trackingID) pr.markTestInFlight(tc, aResponse.trackingID) - log.Infof("%d --> %s Sent %s: %s", workerID, testName, idType, aResponse.trackingID) + log.Debugf("%d --> %s Sent %s: %s", workerID, testName, idType, aResponse.trackingID) totalActionsCounter.Inc() } // if we've reached the expected amount of metadata calls then stop @@ -784,7 +784,7 @@ func (pr *perfRunner) runLoop(tc TestCase) error { pr.sendTime.Record(submissionDurationPerLoop) submissionSecondsPerLoop = submissionDurationPerLoop.Seconds() sentTime = time.Now() - log.Infof("%d --> %s All actions sent %d after %f seconds", workerID, testName, resultCount, submissionSecondsPerLoop) + log.Debugf("%d --> %s All actions sent %d after %f seconds", workerID, testName, resultCount, submissionSecondsPerLoop) pr.endSendTime = time.Now().Unix() break From be7b418b053b2b4d1cfce9dd4e04a6557e82ff76 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 8 Nov 2023 15:40:39 +0000 Subject: [PATCH 12/16] fixing min calculation Signed-off-by: Chengxuan Xing --- internal/util/report_builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/util/report_builder.go b/internal/util/report_builder.go index bc6c0de..919a5a3 100644 --- a/internal/util/report_builder.go +++ b/internal/util/report_builder.go @@ -36,7 +36,7 @@ type Latency struct { func (lt *Latency) Record(latency time.Duration) { lt.mux.Lock() defer lt.mux.Unlock() - if latency < lt.min { + if latency < lt.min || lt.min.Nanoseconds() == 0 { lt.min = latency } if latency > lt.max { From 1e4d48e669e954447de4c0781be5a8f06ddca0a5 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 8 Nov 2023 15:42:53 +0000 Subject: [PATCH 13/16] more debug logs Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 697b353..29572db 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -755,7 +755,7 @@ func (pr *perfRunner) runLoop(tc TestCase) error { actionCount := actionsCompleted go func() { trackingID, err := tc.RunOnce(actionCount) - log.Infof("%d --> %s action %d sent after %f seconds", workerID, testName, actionCount, time.Since(startTime).Seconds()) + log.Debugf("%d --> %s action %d sent after %f seconds", workerID, testName, actionCount, time.Since(startTime).Seconds()) actionResponses <- &ActionResponse{ trackingID: trackingID, err: err, From 7649578edd1d93d7802e15b5089a1ff465bbc5bf Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Fri, 10 Nov 2023 13:01:59 +0000 Subject: [PATCH 14/16] add report builder Signed-off-by: Chengxuan Xing --- .gitignore | 1 + cmd/run.go | 12 +- go.sum | 6 - internal/conf/conf.go | 6 +- internal/perf/perf.go | 53 +++++---- internal/util/report_builder.go | 197 ++++++++++++++++++++++++++++++-- 6 files changed, 233 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index 92f7f07..aeaf3e4 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ main .DS_Store ffperf/ffperf ff-perf.log +ffperf-report.html .vscode dist/ *.iml diff --git a/cmd/run.go b/cmd/run.go index d754775..e67c0ae 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -28,6 +28,7 @@ import ( "github.com/hyperledger/firefly-perf-cli/internal/perf" "github.com/hyperledger/firefly-perf-cli/internal/server" "github.com/hyperledger/firefly-perf-cli/internal/types" + "github.com/hyperledger/firefly-perf-cli/internal/util" "github.com/hyperledger/firefly/pkg/core" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -65,17 +66,22 @@ Executes a instance within a performance test suite to generate synthetic load a log.Warn("both the \"instance-name\" and \"instance-index\" flags were provided, using \"instance-name\"") } - instance, err := selectInstance(config) + instanceConfig, err := selectInstance(config) if err != nil { return err } - runnerConfig, err := generateRunnerConfigFromInstance(instance, config) + runnerConfig, err := generateRunnerConfigFromInstance(instanceConfig, config) if err != nil { return err } - perfRunner = perf.New(runnerConfig) + configYaml, err := yaml.Marshal(instanceConfig) + if err != nil { + return err + } + + perfRunner = perf.New(runnerConfig, util.NewReportForTestInstance(string(configYaml), instanceName)) httpServer = server.NewHttpServer() return nil diff --git a/go.sum b/go.sum index b506040..6d3972b 100644 --- a/go.sum +++ b/go.sum @@ -173,12 +173,6 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hyperledger/firefly v1.2.0 h1:No82vzsur3TODU0giIECDcMnWQ/8BRGoYo7QK/avr4A= github.com/hyperledger/firefly v1.2.0/go.mod h1:tmpTfSjX/NIa7xHTtTb36S48X9+3nNutY7ZxLt3lgCU= -github.com/hyperledger/firefly-common v1.2.13 h1:4pGL8LusXoijeoxM9J36fzBq4jvZpZbGjpQqgempXMk= -github.com/hyperledger/firefly-common v1.2.13/go.mod h1:17lOH4YufiPy82LpKm8fPa/YXJ0pUyq01zK1CmklJwM= -github.com/hyperledger/firefly-common v1.2.14 h1:HON9GJZXvrL0l2AG5DWHSGiBh05hElgFS5lm1OPR83M= -github.com/hyperledger/firefly-common v1.2.14/go.mod h1:17lOH4YufiPy82LpKm8fPa/YXJ0pUyq01zK1CmklJwM= -github.com/hyperledger/firefly-common v1.2.15 h1:WdNB65IJvIyiOhVW3nxB3sQKqtJbdJ7ie0PJIM11CSU= -github.com/hyperledger/firefly-common v1.2.15/go.mod h1:17lOH4YufiPy82LpKm8fPa/YXJ0pUyq01zK1CmklJwM= github.com/hyperledger/firefly-common v1.2.16 h1:cVSaxKycOb+/oT2wExbrzxr68aVKQObeBOLaiJ0mTLg= github.com/hyperledger/firefly-common v1.2.16/go.mod h1:17lOH4YufiPy82LpKm8fPa/YXJ0pUyq01zK1CmklJwM= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= diff --git a/internal/conf/conf.go b/internal/conf/conf.go index d05bc93..17a3998 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -81,7 +81,7 @@ type InstanceConfig struct { MaxTimePerAction time.Duration `json:"maxTimePerAction,omitempty" yaml:"maxTimePerAction,omitempty"` MaxActions int64 `json:"maxActions,omitempty" yaml:"maxActions,omitempty"` RampLength time.Duration `json:"rampLength,omitempty" yaml:"rampLength,omitempty"` - SkipMintConfirmations bool `json:"skipMintConfirmations,omitempty" yaml:"skipMintConfirmations,omitempty"` + SkipMintConfirmations bool `json:"skipMintConfirmations" yaml:"skipMintConfirmations"` DelinquentAction string `json:"delinquentAction,omitempty" yaml:"delinquentAction,omitempty"` PerWorkerSigningKeyPrefix string `json:"perWorkerSigningKeyPrefix,omitempty" yaml:"perWorkerSigningKeyPrefix,omitempty"` } @@ -121,8 +121,8 @@ type TokenConfig struct { type ContractOptions struct { Address string `json:"address" yaml:"address"` - Channel string `json:"channel" yaml:"channel"` - Chaincode string `json:"chaincode" yaml:"chaincode"` + Channel string `json:"channel,omitempty" yaml:"channel,omitempty"` + Chaincode string `json:"chaincode,omitempty" yaml:"chaincode,omitempty"` } type FireFlyWsConfig struct { diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 29572db..844540d 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -156,21 +156,23 @@ type inflightTest struct { var mintStartingBalance int type perfRunner struct { - bfr chan int - cfg *conf.RunnerConfig - client *resty.Client - ctx context.Context - shutdown context.CancelFunc - stopping bool + bfr chan int + cfg *conf.RunnerConfig + client *resty.Client + ctx context.Context + shutdown context.CancelFunc + stopping bool + startTime int64 endSendTime int64 endTime int64 startRampTime int64 endRampTime int64 - sendTime *util.Latency - receiveTime *util.Latency - totalTime *util.Latency + reportBuilder *util.Report + sendTime *util.Latency + receiveTime *util.Latency + totalTime *util.Latency msgTimeMap map[string]*inflightTest rampSummary int64 @@ -196,7 +198,7 @@ type SubscriptionInfo struct { Job fftypes.FFEnum } -func New(config *conf.RunnerConfig) PerfRunner { +func New(config *conf.RunnerConfig, reportBuilder *util.Report) PerfRunner { if config.LogLevel != "" { if level, err := log.ParseLevel(config.LogLevel); err == nil { log.SetLevel(level) @@ -234,6 +236,7 @@ func New(config *conf.RunnerConfig) PerfRunner { startTime: startTime, endTime: endTime, poolName: poolName, + reportBuilder: reportBuilder, sendTime: &util.Latency{}, receiveTime: &util.Latency{}, totalTime: &util.Latency{}, @@ -514,8 +517,22 @@ perfLoop: pr.stopping = true measuredActions := pr.totalSummary measuredTime := time.Since(time.Unix(pr.startTime, 0)).Seconds() - measuredTps := pr.calculateCurrentTps(true) - measuredSendTps := pr.calculateSendTps() + + testNames := make([]string, len(pr.cfg.Tests)) + for _, t := range pr.cfg.Tests { + testNames = append(testNames, t.Name.String()) + } + testNameString := testNames[0] + if len(testNames) > 1 { + testNameString = strings.Join(testNames[:], ",") + } + tps := util.GenerateTPS(measuredActions, pr.startTime, pr.endSendTime) + pr.reportBuilder.AddTestRunMetrics(testNameString, measuredActions, measuredTime, tps, pr.totalTime) + err = pr.reportBuilder.GenerateHTML() + + if err != nil { + log.Errorf("failed to generate performance report: %+v", err) + } // we sleep on shutdown / completion to allow for Prometheus metrics to be scraped one final time // After 30 seconds workers should be completed, so we check for delinquent messages @@ -538,8 +555,8 @@ perfLoop: log.Infof(" - Prometheus metric actions_submitted_total = %f\n", getMetricVal(totalActionsCounter)) log.Infof(" - Test duration (secs): %2f", measuredTime) log.Infof(" - Measured actions: %d", measuredActions) - log.Infof(" - Measured send TPS: %2f", measuredSendTps) - log.Infof(" - Measured throughput: %2f", measuredTps) + log.Infof(" - Measured send TPS: %2f", tps.SendRate) + log.Infof(" - Measured throughput: %2f", tps.Throughput) log.Infof(" - Measured send duration: %s", pr.sendTime) log.Infof(" - Measured event receiving duration: %s", pr.receiveTime) log.Infof(" - Measured total duration: %s", pr.totalTime) @@ -1352,14 +1369,6 @@ func (pr *perfRunner) calculateCurrentTps(logValue bool) float64 { } return currentTps } -func (pr *perfRunner) calculateSendTps() float64 { - measuredActions := pr.totalSummary - sendDuration := time.Duration((pr.endSendTime - pr.startTime) * int64(time.Second)) - durationSec := sendDuration.Seconds() - sendTps := float64(measuredActions) / durationSec - log.Infof("Send TPS: %v Measured Actions: %v Duration: %v", sendTps, measuredActions, durationSec) - return sendTps -} func (pr *perfRunner) ramping() bool { if time.Now().Before(time.Unix(pr.endRampTime, 0)) { diff --git a/internal/util/report_builder.go b/internal/util/report_builder.go index 919a5a3..aa44e93 100644 --- a/internal/util/report_builder.go +++ b/internal/util/report_builder.go @@ -2,17 +2,189 @@ package util import ( "fmt" + "html/template" + "os" "sync" "time" + + log "github.com/sirupsen/logrus" ) -type Summary struct { - Latency *Latency `json:"latency"` - TPS *TPS `json:"tps"` - ResultCount *TPS `json:"resultCount"` +type TestRunMetrics struct { + Name string + TotalActions string + Duration string + SendRate string + MinLatency string + MaxLatency string + AvgLatency string + Throughput string +} +type Report struct { + RunnerConfig string + TestInstanceName string + TestRuns []TestRunMetrics } -type SystemUnderTest struct { +func (r *Report) GenerateHTML() error { + htmlTemplate := ` + + + + + + HyperLedger Firefly Performance Report + + + + + +
+

Test runner configuration

+ +
+{{.RunnerConfig}}
+            
+
+
+ +
+

Test metrics

+

+ Test instance:{{.TestInstanceName}} +

+
+ + + + + + + + + + + + {{range .TestRuns}} + + + + + + + + + + + {{end}} +
Test nameTest duration (secs)ActionsSend TPSMin LatencyMax LatencyAvg LatencyThroughput
{{.Name}}{{.TotalActions}}{{.Duration}}{{.SendRate}}{{.MinLatency}}{{.MaxLatency}}{{.AvgLatency}}{{.Throughput}}
+
+
+ + +` + // Execute the template + tmpl, err := template.New("template").Parse(htmlTemplate) + if err != nil { + return err + } + + // Create or open the output file + outputFile, err := os.Create("ffperf-report.html") + if err != nil { + return err + } + defer outputFile.Close() + + // Write the HTML output to the file + err = tmpl.Execute(outputFile, r) + if err != nil { + return err + } + + return nil +} + +func (r *Report) AddTestRunMetrics(name string, totalActions int64, duration float64, tps *TPS, lt *Latency) { + r.TestRuns = append(r.TestRuns, TestRunMetrics{ + Name: name, + TotalActions: fmt.Sprintf("%d", totalActions), + Duration: fmt.Sprintf("%f", duration), + SendRate: fmt.Sprintf("%f", tps.SendRate), + Throughput: fmt.Sprintf("%f", tps.Throughput), + MinLatency: lt.Min().String(), + MaxLatency: lt.Max().String(), + AvgLatency: lt.Avg().String(), + }) +} + +func NewReportForTestInstance(runnerConfig string, instanceName string) *Report { + return &Report{ + RunnerConfig: runnerConfig, + TestInstanceName: instanceName, + TestRuns: make([]TestRunMetrics, 0), + } } type TPS struct { @@ -20,9 +192,18 @@ type TPS struct { Throughput float64 `json:"throughput"` } -type ResultCount struct { - TotalCount int64 `json:"totalCount"` - RetryCount int64 `json:"retryCount"` +func GenerateTPS(totalActions int64, startTime int64, endSendTime int64) *TPS { + sendDuration := time.Duration((endSendTime - startTime) * int64(time.Second)) + sendDurationSec := sendDuration.Seconds() + sendRate := float64(totalActions) / sendDurationSec + + totalDurationSec := time.Since(time.Unix(startTime, 0)).Seconds() + throughput := float64(totalActions) / totalDurationSec + log.Infof("Send rate: %f, Throughput: %f, Measured Actions: %v Duration: %v (Send duration: %v)", sendRate, throughput, totalActions, sendDurationSec, totalDurationSec) + return &TPS{ + SendRate: sendRate, + Throughput: throughput, + } } type Latency struct { From 07422e0e310124443b997ab7f763eb62e2767fcd Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Fri, 10 Nov 2023 13:08:53 +0000 Subject: [PATCH 15/16] duration as duration string Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 4 ++-- internal/util/report_builder.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 844540d..5fbb1c3 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -516,7 +516,7 @@ perfLoop: pr.stopping = true measuredActions := pr.totalSummary - measuredTime := time.Since(time.Unix(pr.startTime, 0)).Seconds() + measuredTime := time.Since(time.Unix(pr.startTime, 0)) testNames := make([]string, len(pr.cfg.Tests)) for _, t := range pr.cfg.Tests { @@ -553,7 +553,7 @@ perfLoop: log.Infof(" - Prometheus metric incomplete_events_total = %f\n", getMetricVal(incompleteEventsCounter)) log.Infof(" - Prometheus metric delinquent_msgs_total = %f\n", getMetricVal(delinquentMsgsCounter)) log.Infof(" - Prometheus metric actions_submitted_total = %f\n", getMetricVal(totalActionsCounter)) - log.Infof(" - Test duration (secs): %2f", measuredTime) + log.Infof(" - Test duration: %s", measuredTime) log.Infof(" - Measured actions: %d", measuredActions) log.Infof(" - Measured send TPS: %2f", tps.SendRate) log.Infof(" - Measured throughput: %2f", tps.Throughput) diff --git a/internal/util/report_builder.go b/internal/util/report_builder.go index aa44e93..88d3078 100644 --- a/internal/util/report_builder.go +++ b/internal/util/report_builder.go @@ -118,7 +118,7 @@ func (r *Report) GenerateHTML() error { - + @@ -166,11 +166,11 @@ func (r *Report) GenerateHTML() error { return nil } -func (r *Report) AddTestRunMetrics(name string, totalActions int64, duration float64, tps *TPS, lt *Latency) { +func (r *Report) AddTestRunMetrics(name string, totalActions int64, duration time.Duration, tps *TPS, lt *Latency) { r.TestRuns = append(r.TestRuns, TestRunMetrics{ Name: name, TotalActions: fmt.Sprintf("%d", totalActions), - Duration: fmt.Sprintf("%f", duration), + Duration: fmt.Sprintf("%s", duration), SendRate: fmt.Sprintf("%f", tps.SendRate), Throughput: fmt.Sprintf("%f", tps.Throughput), MinLatency: lt.Min().String(), From 8f04508802d8674f4d6ff42e9b1020c0d930f227 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Fri, 10 Nov 2023 13:30:57 +0000 Subject: [PATCH 16/16] remove comma Signed-off-by: Chengxuan Xing --- internal/perf/perf.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/perf/perf.go b/internal/perf/perf.go index 5fbb1c3..10d3458 100644 --- a/internal/perf/perf.go +++ b/internal/perf/perf.go @@ -519,8 +519,8 @@ perfLoop: measuredTime := time.Since(time.Unix(pr.startTime, 0)) testNames := make([]string, len(pr.cfg.Tests)) - for _, t := range pr.cfg.Tests { - testNames = append(testNames, t.Name.String()) + for i, t := range pr.cfg.Tests { + testNames[i] = t.Name.String() } testNameString := testNames[0] if len(testNames) > 1 {
Test nameTest duration (secs)Test duration Actions Send TPS Min Latency