Skip to content

Commit

Permalink
w1p: Add no_wait mode to conbench
Browse files Browse the repository at this point in the history
  • Loading branch information
abukosek committed Oct 5, 2020
1 parent 693e44a commit 322c811
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 18 deletions.
81 changes: 65 additions & 16 deletions go/extra/conbench/cmd/conbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
// Use test entity for funding?
CfgUseTestEntity = "use_test_entity"

// CfgNoWait uses SubmitTxNoWait instead of SubmitTx, submits txns for the
// given amount of time, then stops.
CfgNoWait = "no_wait"

// Gas price (should be set to the minimum gas price of validators).
CfgGasPrice = "gas_price"

Expand Down Expand Up @@ -84,7 +88,7 @@ type localAccount struct {
cachedGas uint64
}

func transfer(ctx context.Context, cc consensus.ClientBackend, from *localAccount, toAddr staking.Address, amount uint64, noCache bool) error {
func transfer(ctx context.Context, cc consensus.ClientBackend, from *localAccount, toAddr staking.Address, amount uint64, noCache, noWait bool) error {
var err error

// Get sender's nonce if not yet cached (or if we're ignoring cache).
Expand Down Expand Up @@ -138,16 +142,21 @@ func transfer(ctx context.Context, cc consensus.ClientBackend, from *localAccoun
// Increment cached nonce.
atomic.AddUint64(&from.cachedNonce, 1)

// Submit with timeout to avoid blocking forever if the client node
// is skipping CheckTx checks. The timeout should be set large enough
// for the network to handle the submission.
timeout := viper.GetDuration(CfgSubmitTxTimeout)
submissionCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if err = cc.SubmitTx(submissionCtx, signedTx); err != nil {
return err
if noWait {
// Submit transaction, but don't wait for it to be included in a block.
return cc.SubmitTxNoWait(ctx, signedTx)
} else {
// Submit with timeout to avoid blocking forever if the client node
// is skipping CheckTx checks. The timeout should be set large enough
// for the network to handle the submission.
timeout := viper.GetDuration(CfgSubmitTxTimeout)
submissionCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if err = cc.SubmitTx(submissionCtx, signedTx); err != nil {
return err
}
return nil
}
return nil
}

func refund(ctx context.Context, cc consensus.ClientBackend, sc staking.Backend, from *localAccount, toAddr staking.Address) error {
Expand All @@ -169,7 +178,7 @@ func refund(ctx context.Context, cc consensus.ClientBackend, sc staking.Backend,
}

// We don't want refunds to fail, so disable caching.
if err = transfer(ctx, cc, from, toAddr, amount, true); err != nil {
if err = transfer(ctx, cc, from, toAddr, amount, true, false); err != nil {
return fmt.Errorf("unable to refund from account: %w", err)
}

Expand Down Expand Up @@ -367,7 +376,7 @@ func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo
account[a].cachedGas = uint64(estGas)

// Each account gets perAccountFunds tokens.
if errr := transfer(ctx, cc, &fundingAcct, account[a].addr, perAccountFunds, true); errr != nil {
if errr := transfer(ctx, cc, &fundingAcct, account[a].addr, perAccountFunds, true, false); errr != nil {
// An error has happened while funding, make sure to refund the
// funding account from the accounts funded until this point.
logger.Error("error while funding, attempting to refund account")
Expand All @@ -380,6 +389,9 @@ func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo
}
}

noWait := viper.IsSet(CfgNoWait)
noWaitDuration := viper.GetDuration(CfgNoWait)

logger.Info("starting benchmark", "num_accounts", numAccounts)
startStatus, err := cc.GetStatus(ctx)
if err != nil {
Expand All @@ -399,6 +411,7 @@ func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo
totalSubmitTimeNs uint64
numSubmitSamples uint64
numSubmitErrors uint64
gottaStopFast uint32
)

// Perform benchmark in parallel, one goroutine per account.
Expand All @@ -407,12 +420,17 @@ func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo
go func(idx uint64) {
var noCache bool
for s := uint64(0); s < numSamples; s++ {
if atomic.LoadUint32(&gottaStopFast) > 0 {
// Terminate.
return
}

fromIdx := idx
toIdx := idx
toAddr := account[toIdx].addr

startT := time.Now()
if err = transfer(ctx, cc, &account[fromIdx], toAddr, 1, noCache); err != nil {
if err = transfer(ctx, cc, &account[fromIdx], toAddr, 1, noCache, noWait); err != nil {
atomic.AddUint64(&numSubmitErrors, 1)
// Disable cache for the next sample, just in case
// we messed up the nonce or if the gas cost changed.
Expand All @@ -428,9 +446,14 @@ func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo
}(uint64(a))
}

// Wait for all goroutines to finish.
for i := uint64(0); i < numAccounts*numSamples; i++ {
<-doneCh
if !noWait {
// Wait for all goroutines to finish.
for i := uint64(0); i < numAccounts*numSamples; i++ {
<-doneCh
}
} else {
time.Sleep(noWaitDuration)
atomic.StoreUint32(&gottaStopFast, 1)
}

benchmarkDuration := time.Since(benchmarkStartT)
Expand All @@ -450,13 +473,17 @@ func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo
// transactions per second and other stats.
// Note that we count all transactions, not just the ones made
// by this benchmark.
//
// In addition, do a sliding window for the max avg tps.
var totalTxs uint64
var maxTxs uint64
minTxs := uint64(18446744073709551615)
txsPerBlock := make([]uint64, 0)
txBytesPerBlock := make([]uint64, 0)
blockDeltaT := make([]float64, 0)
blockT := make([]time.Time, 0)
var prevBlockT time.Time

for height := benchmarkStartHeight; height <= benchmarkStopHeight; height++ {
// Count number of transactions.
txs, grr := cc.GetTransactions(ctx, height)
Expand Down Expand Up @@ -492,6 +519,7 @@ func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo
}
blockDeltaT = append(blockDeltaT, blk.Time.Sub(prevBlockT).Seconds())
prevBlockT = blk.Time
blockT = append(blockT, blk.Time)
}

tps := float64(totalTxs) / benchmarkDuration.Seconds()
Expand All @@ -502,6 +530,24 @@ func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo

avgSubmitTimeNs := float64(totalSubmitTimeNs) / float64(numSubmitSamples)

// Do a sliding window over the block size array to get the max avg tps.
var bestAvgTps float64
for slidingWindowSize := 1; slidingWindowSize <= 32; slidingWindowSize++ {
for i := range txsPerBlock {
var curAvgTps float64
j := i
for ; j < i+slidingWindowSize && j < len(txsPerBlock); j++ {
curAvgTps += float64(txsPerBlock[j])
}
curAvgTps /= blockT[j-1].Sub(blockT[i]).Seconds()
// Can divide by zero above if blocks are too close together.
// XXX: Find a workaround for this.
if curAvgTps > bestAvgTps && !math.IsInf(curAvgTps, 0) {
bestAvgTps = curAvgTps
}
}
}

logger.Info("benchmark finished",
// Number of accounts involved in benchmark (level of parallelism).
"num_accounts", numAccounts,
Expand Down Expand Up @@ -535,6 +581,8 @@ func doRun(cmd *cobra.Command, args []string) error { // nolint: gocyclo
"block_sizes_bytes", strings.Trim(fmt.Sprint(txBytesPerBlock), "[]"),
// Time delta between blocks (in seconds).
"block_delta_t_s", strings.Trim(fmt.Sprint(blockDeltaT), "[]"),
// Maximum average tps over a sliding window.
"max_avg_tps", bestAvgTps,
)

// Refund money into original funding account.
Expand All @@ -557,6 +605,7 @@ func init() {
fs.Uint64(CfgNumAccounts, 10, "Number of accounts to create for benchmarking (also level of parallelism)")
fs.Uint64(CfgNumSamples, 30, "Number of samples (transfers) per account")
fs.Duration(CfgSubmitTxTimeout, 10*time.Second, "Timeout for SubmitTx (set this based on network parameters)")
fs.Duration(CfgNoWait, 10*time.Second, "Use SubmitTxNoWait instead of SubmitTx (spam transactions) for given amount of time")
fs.Bool(CfgUseTestEntity, false, "Use test entity for funding (only for testing)")
fs.Uint64(CfgGasPrice, 1, "Gas price (should be set to the minimum gas price of validators)")
fs.Bool(CfgFundAndExit, false, "Only fund accounts and exit")
Expand Down
24 changes: 22 additions & 2 deletions go/extra/conbench/conbench-plot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ BS_PLOT="conbench-block-size.${NOW}.png"
BSS_PLOT="conbench-block-sizes.${NOW}.png"
BSSB_PLOT="conbench-block-sizes-bytes.${NOW}.png"
BTS_PLOT="conbench-block-times.${NOW}.png"
MATPS_PLOT="conbench-max-avg-tps.${NOW}.png"

# Get the root directory of the repository.
ROOT="$(cd $(dirname $0)/../../../; pwd -P)"
Expand Down Expand Up @@ -77,6 +78,7 @@ BS_DATA_FILE="$(mktemp -t oasis-conbench-bs-plot-XXXXXXXXXX)"
BSS_DATA_FILE="$(mktemp -t oasis-conbench-bss-plot-XXXXXXXXXX)"
BSSB_DATA_FILE="$(mktemp -t oasis-conbench-bssb-plot-XXXXXXXXXX)"
BTS_DATA_FILE="$(mktemp -t oasis-conbench-bts-plot-XXXXXXXXXX)"
MATPS_DATA_FILE="$(mktemp -t oasis-conbench-max-avg-tps-plot-XXXXXXXXXX)"

ARGS="$@"

Expand All @@ -99,7 +101,7 @@ run_bench() {

# Run benchmark.
printf "${GRN}*** Running benchmark for ${num_accounts} accounts...${OFF}\n"
conbench --skip_funding --num_accounts ${num_accounts} > "${output}"
conbench --skip_funding --no_wait=60s --num_samples=100 --num_accounts ${num_accounts} > "${output}"

local results=$(fgrep 'msg="benchmark finished"' "${output}")
echo "${results}" | tee -a "${RAW_DATA}"
Expand All @@ -118,6 +120,8 @@ run_bench() {

local bts=$(echo "${results}" | sed -r 's/[[:alnum:]_]+=/\n&/g' | awk -F= '$1=="block_delta_t_s"{print $2}' | tr -d '"')

local matps=$(echo "${results}" | sed -r 's/[[:alnum:]_]+=/\n&/g' | awk -F= '$1=="max_avg_tps"{print $2}')

rm "${output}"

if [[ "${no_plot}" == "no_plot" ]]; then
Expand Down Expand Up @@ -148,6 +152,8 @@ run_bench() {
echo "${num_accounts} ${blk} ${bt}" >> "${BTS_DATA_FILE}"
blk=$((blk+1))
done

echo "${num_accounts} ${matps}" >> "${MATPS_DATA_FILE}"
}

ACCT="10, 50, 100, 175, 250, 325, 425, 500, 650, 800, 900"
Expand Down Expand Up @@ -304,7 +310,21 @@ set palette defined (0 "dark-violet", 1 "blue", 2 "cyan", 3 "yellow", 4 "red")
splot '${BTS_DATA_FILE}' with impulses lw 2 lc palette notitle
EOF

rm "${TPS_DATA_FILE}" "${ST_DATA_FILE}" "${BS_DATA_FILE}" "${BSS_DATA_FILE}" "${BSSB_DATA_FILE}" "${BTS_DATA_FILE}"
# Plot max avg TPS graph.
gnuplot <<- EOF
set title "Maximum average transactions per second"
set xlabel "Number of parallel accounts"
set xtics (${ACCT})
set ylabel "transactions/s" textcolor lt 1
set autoscale y
set grid
set term png
set output "${MATPS_PLOT}"
plot '${MATPS_DATA_FILE}' using 1:2 with linespoint notitle
EOF


rm "${TPS_DATA_FILE}" "${ST_DATA_FILE}" "${BS_DATA_FILE}" "${BSS_DATA_FILE}" "${BSSB_DATA_FILE}" "${BTS_DATA_FILE}" "${MATPS_DATA_FILE}"

printf "${GRN}*** Refunding original funding account...${OFF}\n"
conbench --num_accounts ${MAX_ACCTS} --refund_and_exit
Expand Down

0 comments on commit 322c811

Please sign in to comment.