Skip to content

Commit

Permalink
Merge pull request #1629 from orbs-network/feature/stress-test
Browse files Browse the repository at this point in the history
Improved stress test
  • Loading branch information
Kirill Maksimov authored Oct 27, 2020
2 parents 7fafaab + 4ab4bb9 commit ecccc93
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 14 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/orbs-network/healthcheck v1.3.0
github.com/orbs-network/lean-helix-go v0.5.1-0.20201011065550-9473b7e1df05
github.com/orbs-network/membuffers v0.4.0
github.com/orbs-network/orbs-client-sdk-go v0.18.0
github.com/orbs-network/orbs-client-sdk-go v0.19.0
github.com/orbs-network/orbs-contract-sdk v1.8.0
github.com/orbs-network/orbs-spec v0.0.0-20201013064336-2e9a104f3993
github.com/orbs-network/scribe v0.2.3
Expand Down
46 changes: 41 additions & 5 deletions test/e2e/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ type E2EConfig struct {

type StressTestConfig struct {
enabled bool
async bool
skipState bool
numberOfTransactions int64
acceptableFailureRate int64
targetTPS float64
apiEndpoints []string
}

const START_HTTP_PORT = 8090
Expand Down Expand Up @@ -94,11 +97,34 @@ func (h *Harness) DeployNativeContract(from *keys.Ed25519KeyPair, contractName s
}

func (h *Harness) SendTransaction(senderPublicKey []byte, senderPrivateKey []byte, contractName string, methodName string, args ...interface{}) (*codec.TransactionResponse, string, error) {
payload, txId, err := h.client.CreateTransaction(senderPublicKey, senderPrivateKey, contractName, methodName, args...)
return h.SendTransactionWithClient(h.client, senderPublicKey, senderPrivateKey, contractName, methodName, args...)
}

func (h *Harness) SendTransactionWithClient(client *orbsClient.OrbsClient, senderPublicKey []byte, senderPrivateKey []byte, contractName string, methodName string, args ...interface{}) (*codec.TransactionResponse, string, error) {
payload, txId, err := client.CreateTransaction(senderPublicKey, senderPrivateKey, contractName, methodName, args...)
if err != nil {
return nil, txId, err
}
out, err := client.SendTransaction(payload)
if err != nil {
return nil, txId, err
}
return out.TransactionResponse, txId, err
}

func (h *Harness) SendTransactionAsync(senderPublicKey []byte, senderPrivateKey []byte, contractName string, methodName string, args ...interface{}) (*codec.TransactionResponse, string, error) {
return h.SendTransactionAsyncWithClient(h.client, senderPublicKey, senderPrivateKey, contractName, methodName, args...)
}

func (h *Harness) SendTransactionAsyncWithClient(client *orbsClient.OrbsClient, senderPublicKey []byte, senderPrivateKey []byte, contractName string, methodName string, args ...interface{}) (*codec.TransactionResponse, string, error) {
payload, txId, err := client.CreateTransaction(senderPublicKey, senderPrivateKey, contractName, methodName, args...)
if err != nil {
return nil, txId, err
}
out, err := client.SendTransactionAsync(payload)
if err != nil {
return nil, txId, err
}
out, err := h.client.SendTransaction(payload)
return out.TransactionResponse, txId, err
}

Expand Down Expand Up @@ -163,7 +189,7 @@ func (h *Harness) GetBlockHeight() primitives.BlockHeight {

if blockHeight, found := metricReader.GetAsInt(blockstorage.MetricBlockHeight); !found {
return 0
} else {
} else {
return primitives.BlockHeight(blockHeight)
}
}
Expand All @@ -176,7 +202,7 @@ func (h *Harness) GetTransactionCount() int64 {

if txCount, found := metricReader.GetAsInt(transactionpool.MetricCommittedPoolTransactions); !found {
return 0
} else {
} else {
return txCount
}
}
Expand Down Expand Up @@ -204,7 +230,7 @@ func (h *Harness) WaitUntilTransactionPoolIsReady(t *testing.T) {

if lastCommittedTimestamp, found := metricReader.GetAsInt(transactionpool.MetricLastCommittedTime); !found {
return false
} else {
} else {
diff := lastCommittedTimestamp - time.Now().Add(recentBlockTimeDiff*-1).UnixNano()
return diff >= 0
}
Expand Down Expand Up @@ -259,6 +285,8 @@ func GetConfig() E2EConfig {
stressTestNumberOfTransactions := int64(10000)
stressTestFailureRate := int64(2)
stressTestTargetTPS := float64(700)
stressTestAsync := os.Getenv("STRESS_TEST_ASYNC") == "true"
stressTestSkipState := os.Getenv("STRESS_TEST_SKIP_STATE") == "true"

ethereumEndpoint := "http://127.0.0.1:8545"

Expand All @@ -268,10 +296,15 @@ func GetConfig() E2EConfig {
ethereumEndpoint = os.Getenv("ETHEREUM_ENDPOINT")
}

var stressTestAPIEndpoints = []string{os.Getenv("API_ENDPOINT")}
if stressTestEnabled {
stressTestNumberOfTransactions, _ = strconv.ParseInt(os.Getenv("STRESS_TEST_NUMBER_OF_TRANSACTIONS"), 10, 0)
stressTestFailureRate, _ = strconv.ParseInt(os.Getenv("STRESS_TEST_FAILURE_RATE"), 10, 0)
stressTestTargetTPS, _ = strconv.ParseFloat(os.Getenv("STRESS_TEST_TARGET_TPS"), 0)
stressTestAPIEndpointsOverride := strings.Split(os.Getenv("STRESS_TEST_API_ENDPOINTS"), ",")
if len(stressTestAPIEndpointsOverride) > 0 {
stressTestAPIEndpoints = stressTestAPIEndpointsOverride
}
}

return E2EConfig{
Expand All @@ -281,9 +314,12 @@ func GetConfig() E2EConfig {
AppChainUrl: appChainUrl,
StressTest: StressTestConfig{
enabled: stressTestEnabled,
async: stressTestAsync,
numberOfTransactions: stressTestNumberOfTransactions,
acceptableFailureRate: stressTestFailureRate,
targetTPS: stressTestTargetTPS,
apiEndpoints: stressTestAPIEndpoints,
skipState: stressTestSkipState,
},
EthereumEndpoint: ethereumEndpoint,
}
Expand Down
43 changes: 35 additions & 8 deletions test/e2e/network_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package e2e
import (
"context"
"fmt"
"github.com/orbs-network/orbs-client-sdk-go/codec"
"sync"
"testing"
"time"
Expand All @@ -24,7 +25,8 @@ func TestE2EStress(t *testing.T) {
h := NewAppHarness()
ctrlRand := rand.NewControlledRand(t)

config := GetConfig().StressTest
generalConfig := GetConfig()
config := generalConfig.StressTest

if !config.enabled {
t.Skip("Skipping stress test")
Expand All @@ -34,35 +36,59 @@ func TestE2EStress(t *testing.T) {

var wg sync.WaitGroup

limiter := rate.NewLimiter(1000, 50)
limiter := rate.NewLimiter(rate.Limit(config.targetTPS), 50)

var mutex sync.Mutex
var errors []error
var errorTransactionStatuses []string

var clients []*orbsClient.OrbsClient
for _, apiEndpoint := range config.apiEndpoints {
clients = append(clients, orbsClient.NewClient(apiEndpoint, uint32(generalConfig.AppVcid), codec.NETWORK_TYPE_TEST_NET))
}

defaultTarget, _ := orbsClient.CreateAccount()

for i := int64(0); i < config.numberOfTransactions; i++ {
if err := limiter.Wait(context.Background()); err == nil {
wg.Add(1)

go func() {
go func(i int64) {
defer wg.Done()

target, _ := orbsClient.CreateAccount()
target := defaultTarget
if !config.skipState {
target, _ = orbsClient.CreateAccount()
}

amount := uint64(ctrlRand.Intn(10))

response, _, err2 := h.SendTransaction(OwnerOfAllSupply.PublicKey(), OwnerOfAllSupply.PrivateKey(), "BenchmarkToken", "transfer", uint64(amount), target.AddressAsBytes())
client := clients[i%int64(len(clients))] // select one of the clients

var response *codec.TransactionResponse
var err2 error

if config.async {
response, _, err2 = h.SendTransactionAsyncWithClient(client, OwnerOfAllSupply.PublicKey(), OwnerOfAllSupply.PrivateKey(), "BenchmarkToken", "transfer", amount, target.AddressAsBytes())
} else {
response, _, err2 = h.SendTransactionWithClient(client, OwnerOfAllSupply.PublicKey(), OwnerOfAllSupply.PrivateKey(), "BenchmarkToken", "transfer", amount, target.AddressAsBytes())
}

if err2 != nil {
fmt.Println("Encountered an error sending a transaction while stress testing", err2)
fmt.Println("Encountered an error sending a transaction while stress testing", client.Endpoint, err2)
mutex.Lock()
defer mutex.Unlock()
fmt.Println("")
errors = append(errors, err2)
if response != nil {
errorTransactionStatuses = append(errorTransactionStatuses, string(response.TransactionStatus))
errorTransactionStatuses = append(errorTransactionStatuses, string(response.TransactionStatus), "endpoint", client.Endpoint)
}
}
}()

if i+1%100 == 0 {
fmt.Println(fmt.Sprintf("processed transactions: %d/%d", i+1, config.numberOfTransactions))
}
}(i)
} else {
mutex.Lock()
defer mutex.Unlock()
Expand All @@ -72,6 +98,7 @@ func TestE2EStress(t *testing.T) {

wg.Wait()

// very bad and unreliable metric, does not take into account multiple endpoints yet
txCount := float64(getTransactionCount(t, h) - baseTxCount)

expectedNumberOfTx := float64(100-config.acceptableFailureRate) / 100 * float64(config.numberOfTransactions)
Expand Down

0 comments on commit ecccc93

Please sign in to comment.