Skip to content

Commit

Permalink
fix: stop adaptive rate limit after the test is finished (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
leovct committed Jun 22, 2023
1 parent 2eeeb40 commit ad3526c
Showing 1 changed file with 24 additions and 19 deletions.
43 changes: 24 additions & 19 deletions cmd/loadtest/loadtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,32 +547,35 @@ func getTxPoolSize(rpc *ethrpc.Client) (uint64, error) {
return pendingCount + queuedCount, nil
}

func updateRateLimit(rl *rate.Limiter, rpc *ethrpc.Client, steadyStateQueueSize uint64, rateLimitIncrement uint64, cycleDuration time.Duration, backoff float64) {
func updateRateLimit(ctx context.Context, rl *rate.Limiter, rpc *ethrpc.Client, steadyStateQueueSize uint64, rateLimitIncrement uint64, cycleDuration time.Duration, backoff float64) {
ticker := time.NewTicker(cycleDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
txPoolSize, err := getTxPoolSize(rpc)
if err != nil {
log.Error().Err(err).Msg("Error getting txpool size")
return
}

for range ticker.C {
txPoolSize, err := getTxPoolSize(rpc)
if err != nil {
log.Error().Err(err).Msg("Error getting txpool size")
if txPoolSize < steadyStateQueueSize {
// additively increment requests per second if txpool less than queue steady state
newRateLimit := rate.Limit(float64(rl.Limit()) + float64(rateLimitIncrement))
rl.SetLimit(newRateLimit)
log.Info().Float64("New Rate Limit (RPS)", float64(rl.Limit())).Uint64("Current Tx Pool Size", txPoolSize).Uint64("Steady State Tx Pool Size", steadyStateQueueSize).Msg("Increased rate limit")
} else if txPoolSize > steadyStateQueueSize {
// halve rate limit requests per second if txpool greater than queue steady state
rl.SetLimit(rl.Limit() / rate.Limit(backoff))
log.Info().Float64("New Rate Limit (RPS)", float64(rl.Limit())).Uint64("Current Tx Pool Size", txPoolSize).Uint64("Steady State Tx Pool Size", steadyStateQueueSize).Msg("Backed off rate limit")
}
case <-ctx.Done():
return
}

if txPoolSize < steadyStateQueueSize {
// additively increment requests per second if txpool less than queue steady state
newRateLimit := rate.Limit(float64(rl.Limit()) + float64(rateLimitIncrement))
rl.SetLimit(newRateLimit)
log.Info().Float64("New Rate Limit (RPS)", float64(rl.Limit())).Uint64("Current Tx Pool Size", txPoolSize).Uint64("Steady State Tx Pool Size", steadyStateQueueSize).Msg("Increased rate limit")
} else if txPoolSize > steadyStateQueueSize {
// halve rate limit requests per second if txpool greater than queue steady state
rl.SetLimit(rl.Limit() / rate.Limit(backoff))
log.Info().Float64("New Rate Limit (RPS)", float64(rl.Limit())).Uint64("Current Tx Pool Size", txPoolSize).Uint64("Steady State Tx Pool Size", steadyStateQueueSize).Msg("Backed off rate limit")
}
}
}

func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) error {

ltp := inputLoadTestParams
log.Trace().Interface("Input Params", ltp).Msg("Params")

Expand All @@ -589,8 +592,10 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro
if *ltp.RateLimit <= 0.0 {
rl = nil
}
rateLimitCtx, cancel := context.WithCancel(ctx)
defer cancel()
if *ltp.AdaptiveRateLimit && rl != nil {
go updateRateLimit(rl, rpc, steadyStateTxPoolSize, adaptiveRateLimitIncrement, time.Duration(*ltp.AdaptiveCycleDuration)*time.Second, *ltp.AdaptiveBackoffFactor)
go updateRateLimit(rateLimitCtx, rl, rpc, steadyStateTxPoolSize, adaptiveRateLimitIncrement, time.Duration(*ltp.AdaptiveCycleDuration)*time.Second, *ltp.AdaptiveBackoffFactor)
}

tops, err := bind.NewKeyedTransactorWithChainID(privateKey, chainID)
Expand Down Expand Up @@ -842,10 +847,10 @@ func mainLoop(ctx context.Context, c *ethclient.Client, rpc *ethrpc.Client) erro
}
wg.Done()
}(i)

}
log.Trace().Msg("Finished starting go routines. Waiting..")
wg.Wait()
cancel()
log.Debug().Uint64("currentNonce", currentNonce).Msg("Finished main loadtest loop")
log.Debug().Msg("Waiting for transactions to actually be mined")
finalBlockNumber, err := waitForFinalBlock(ctx, c, rpc, startBlockNumber, startNonce, currentNonce)
Expand Down

0 comments on commit ad3526c

Please sign in to comment.