From 10b371d41ccbca43a911ce2ac68bcf39f2da5d68 Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Wed, 3 Jan 2024 14:25:57 -0500 Subject: [PATCH 01/12] concurrent processing of get blocks. batches by 50 default --- cmd/monitor/monitor.go | 66 ++++++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index 942687cb..461c1c87 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -34,6 +34,8 @@ var ( zero = big.NewInt(0) observedPendingTxs historicalRange maxDataPoints = 1000 + maxConcurrency = 10 + semaphore = make(chan struct{}, maxConcurrency) ) type ( @@ -277,26 +279,60 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, to *big.Int, rpc *et return nil } - b := backoff.NewExponentialBackOff() - b.MaxElapsedTime = 3 * time.Minute - retryable := func() error { - return rpc.BatchCallContext(ctx, blms) - } - if err := backoff.Retry(retryable, b); err != nil { + err := ms.processBatchesConcurrently(ctx, rpc, blms) + if err != nil { + log.Error().Err(err).Msg("Error processing batches concurrently") return err } - ms.BlocksLock.Lock() - defer ms.BlocksLock.Unlock() - for _, b := range blms { - if b.Error != nil { - continue - } - pb := rpctypes.NewPolyBlock(b.Result.(*rpctypes.RawBlockResponse)) - ms.BlockCache.Add(pb.Number().String(), pb) + return nil +} + +func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error { + subBatchSize := 50 + var wg sync.WaitGroup + var batchErr error + batchErrLock := sync.Mutex{} + + for i := 0; i < len(blms); i += subBatchSize { + wg.Add(1) + semaphore <- struct{}{} + go func(i int) { + defer wg.Done() + end := i + subBatchSize + if end > len(blms) { + end = len(blms) + } + subBatch := blms[i:end] + + b := backoff.NewExponentialBackOff() + b.MaxElapsedTime = 3 * time.Minute + retryable := func() error { + return rpc.BatchCallContext(ctx, subBatch) + } + if err := backoff.Retry(retryable, b); err != nil { + batchErrLock.Lock() + if batchErr == nil { + batchErr = err + } + batchErrLock.Unlock() + } + + for _, elem := range subBatch { + if elem.Error != nil { + log.Error().Str("Method", elem.Method).Interface("Args", elem.Args).Err(elem.Error).Msg("Failed batch element") + } else { + pb := rpctypes.NewPolyBlock(elem.Result.(*rpctypes.RawBlockResponse)) + ms.BlockCache.Add(pb.Number().String(), pb) + } + } + + <-semaphore + }(i) } - return nil + wg.Wait() + return batchErr } func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client) error { From 4cbd12b7beeb5bae83a08f2a3086959f269bc481 Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Wed, 3 Jan 2024 15:06:56 -0500 Subject: [PATCH 02/12] fix error --- cmd/monitor/monitor.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index 461c1c87..625c928f 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -291,7 +291,7 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, to *big.Int, rpc *et func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error { subBatchSize := 50 var wg sync.WaitGroup - var batchErr error + var err error batchErrLock := sync.Mutex{} for i := 0; i < len(blms); i += subBatchSize { @@ -312,8 +312,8 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et } if err := backoff.Retry(retryable, b); err != nil { batchErrLock.Lock() - if batchErr == nil { - batchErr = err + if err == nil { + err = err } batchErrLock.Unlock() } @@ -332,7 +332,7 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et } wg.Wait() - return batchErr + return err } func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client) error { From 76e155b9b33959bf124214d0dddac66d67ad8b7e Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Wed, 3 Jan 2024 15:07:38 -0500 Subject: [PATCH 03/12] lock on error value --- cmd/monitor/monitor.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index 625c928f..461c1c87 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -291,7 +291,7 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, to *big.Int, rpc *et func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error { subBatchSize := 50 var wg sync.WaitGroup - var err error + var batchErr error batchErrLock := sync.Mutex{} for i := 0; i < len(blms); i += subBatchSize { @@ -312,8 +312,8 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et } if err := backoff.Retry(retryable, b); err != nil { batchErrLock.Lock() - if err == nil { - err = err + if batchErr == nil { + batchErr = err } batchErrLock.Unlock() } @@ -332,7 +332,7 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et } wg.Wait() - return err + return batchErr } func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client) error { From 2ef1695552c595a7b2e0ea21ec21d1d57905fd71 Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Thu, 4 Jan 2024 12:03:58 -0500 Subject: [PATCH 04/12] leo comments --- cmd/monitor/monitor.go | 58 +++++++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index 461c1c87..e3fd945c 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -27,15 +27,31 @@ import ( var errBatchRequestsNotSupported = errors.New("batch requests are not supported") var ( - windowSize int - batchSize SafeBatchSize - interval time.Duration - one = big.NewInt(1) - zero = big.NewInt(0) + // windowSize determines the number of blocks to display in the monitor UI at one time. + windowSize int + + // batchSize holds the number of blocks to fetch in one batch. + // It can be adjusted dynamically based on network conditions. + batchSize SafeBatchSize + + // interval specifies the time duration to wait between each update cycle. + interval time.Duration + + // one and zero are big.Int representations of 1 and 0, used for convenience in calculations. + one = big.NewInt(1) + zero = big.NewInt(0) + + // observedPendingTxs holds a historical record of the number of pending transactions. observedPendingTxs historicalRange - maxDataPoints = 1000 - maxConcurrency = 10 - semaphore = make(chan struct{}, maxConcurrency) + + // maxDataPoints defines the maximum number of data points to keep in historical records. + maxDataPoints = 1000 + + // maxConcurrency defines the maximum number of goroutines that can fetch block data concurrently. + maxConcurrency = 10 + + // semaphore is a channel used to control the concurrency of block data fetch operations. + semaphore = make(chan struct{}, maxConcurrency) ) type ( @@ -291,14 +307,16 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, to *big.Int, rpc *et func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error { subBatchSize := 50 var wg sync.WaitGroup - var batchErr error - batchErrLock := sync.Mutex{} + errChan := make(chan error, maxConcurrency) for i := 0; i < len(blms); i += subBatchSize { - wg.Add(1) semaphore <- struct{}{} + wg.Add(1) go func(i int) { - defer wg.Done() + defer func() { + <-semaphore + wg.Done() + }() end := i + subBatchSize if end > len(blms) { end = len(blms) @@ -311,11 +329,7 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et return rpc.BatchCallContext(ctx, subBatch) } if err := backoff.Retry(retryable, b); err != nil { - batchErrLock.Lock() - if batchErr == nil { - batchErr = err - } - batchErrLock.Unlock() + errChan <- err } for _, elem := range subBatch { @@ -327,11 +341,19 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et } } - <-semaphore }(i) } wg.Wait() + + close(errChan) + + var batchErr error + for err := range errChan { + if batchErr == nil { + batchErr = err + } + } return batchErr } From 86d0ed7d520fed99b5659bfd5a69f09d6bb4632c Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Thu, 4 Jan 2024 12:05:54 -0500 Subject: [PATCH 05/12] add lock --- cmd/monitor/monitor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index e3fd945c..a1853284 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -337,7 +337,9 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et log.Error().Str("Method", elem.Method).Interface("Args", elem.Args).Err(elem.Error).Msg("Failed batch element") } else { pb := rpctypes.NewPolyBlock(elem.Result.(*rpctypes.RawBlockResponse)) + ms.BlocksLock.Lock() ms.BlockCache.Add(pb.Number().String(), pb) + ms.BlocksLock.Unlock() } } From 318d9ba941f7a7bb01cb2d6a9a2cbeaef52ff876 Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Thu, 4 Jan 2024 12:56:31 -0500 Subject: [PATCH 06/12] block info --- cmd/monitor/ui/ui.go | 85 ++++++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 22 deletions(-) diff --git a/cmd/monitor/ui/ui.go b/cmd/monitor/ui/ui.go index 499e5381..f6b7dff9 100644 --- a/cmd/monitor/ui/ui.go +++ b/cmd/monitor/ui/ui.go @@ -32,14 +32,35 @@ type UiSkeleton struct { } func GetCurrentBlockInfo(headBlock *big.Int, gasPrice *big.Int, peerCount uint64, pendingCount uint64, chainID *big.Int, blocks []rpctypes.PolyBlock) string { + // Calculate the formatted strings height := fmt.Sprintf("Height: %s", headBlock.String()) time := fmt.Sprintf("Time: %s", time.Now().Format("02 Jan 06 15:04:05 MST")) gasPriceString := fmt.Sprintf("Gas Price: %s gwei", new(big.Int).Div(gasPrice, metrics.UnitShannon).String()) peers := fmt.Sprintf("Peers: %d", peerCount) pendingTx := fmt.Sprintf("Pending Tx: %d", pendingCount) chainIdString := fmt.Sprintf("Chain ID: %s", chainID.String()) - return fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", height, time, gasPriceString, peers, pendingTx, chainIdString) + maxWidthCol1 := max(len(height), len(gasPriceString), len(pendingTx)) + + height = fmt.Sprintf("%-*s", maxWidthCol1, height) + gasPriceString = fmt.Sprintf("%-*s", maxWidthCol1, gasPriceString) + pendingTx = fmt.Sprintf("%-*s", maxWidthCol1, pendingTx) + + line1 := fmt.Sprintf("%s %s", height, time) + line2 := fmt.Sprintf("%s %s", gasPriceString, peers) + line3 := fmt.Sprintf("%s %s", pendingTx, chainIdString) + + return fmt.Sprintf("%s\n%s\n%s", line1, line2, line3) +} + +func max(nums ...int) int { + maxNum := nums[0] + for _, num := range nums[1:] { + if num > maxNum { + maxNum = num + } + } + return maxNum } func GetBlocksList(blocks []rpctypes.PolyBlock) ([]string, string) { @@ -127,7 +148,6 @@ func GetSimpleBlockFields(block rpctypes.PolyBlock) []string { ut := time.Unix(int64(ts), 0) author := "Mined by" - authorAddress := block.Miner().String() if authorAddress == "0x0000000000000000000000000000000000000000" { author = "Signed by" @@ -135,29 +155,50 @@ func GetSimpleBlockFields(block rpctypes.PolyBlock) []string { if err == nil { authorAddress = hex.EncodeToString(signer) } - } - return []string{ - "", - fmt.Sprintf("Block Height: %s", block.Number()), - fmt.Sprintf("Timestamp: %d (%s)", ts, ut.Format(time.RFC3339)), - fmt.Sprintf("Transactions: %d", len(block.Transactions())), - fmt.Sprintf("%s: %s", author, authorAddress), - fmt.Sprintf("Difficulty: %s", block.Difficulty()), - fmt.Sprintf("Size: %d", block.Size()), - fmt.Sprintf("Uncles: %d", len(block.Uncles())), - fmt.Sprintf("Gas used: %d", block.GasUsed()), - fmt.Sprintf("Gas limit: %d", block.GasLimit()), - fmt.Sprintf("Base Fee per gas: %s", block.BaseFee()), - fmt.Sprintf("Extra data: %s", metrics.RawDataToASCII(block.Extra())), - fmt.Sprintf("Hash: %s", block.Hash()), - fmt.Sprintf("Parent Hash: %s", block.ParentHash()), - fmt.Sprintf("Uncle Hash: %s", block.UncleHash()), - fmt.Sprintf("State Root: %s", block.Root()), - fmt.Sprintf("Tx Hash: %s", block.TxHash()), - fmt.Sprintf("Nonce: %d", block.Nonce()), + blockHeight := fmt.Sprintf("Block Height: %s", block.Number()) + timestamp := fmt.Sprintf("Timestamp: %d (%s)", ts, ut.Format(time.RFC3339)) + transactions := fmt.Sprintf("Transactions: %d", len(block.Transactions())) + authorInfo := fmt.Sprintf("%s: %s", author, authorAddress) + difficulty := fmt.Sprintf("Difficulty: %s", block.Difficulty()) + size := fmt.Sprintf("Size: %d", block.Size()) + uncles := fmt.Sprintf("Uncles: %d", len(block.Uncles())) + gasUsed := fmt.Sprintf("Gas used: %d", block.GasUsed()) + gasLimit := fmt.Sprintf("Gas limit: %d", block.GasLimit()) + baseFee := fmt.Sprintf("Base Fee per gas: %s", block.BaseFee()) + extraData := fmt.Sprintf("Extra data: %s", metrics.RawDataToASCII(block.Extra())) + hash := fmt.Sprintf("Hash: %s", block.Hash()) + parentHash := fmt.Sprintf("Parent Hash: %s", block.ParentHash()) + uncleHash := fmt.Sprintf("Uncle Hash: %s", block.UncleHash()) + stateRoot := fmt.Sprintf("State Root: %s", block.Root()) + txHash := fmt.Sprintf("Tx Hash: %s", block.TxHash()) + nonce := fmt.Sprintf("Nonce: %d", block.Nonce()) + + maxWidthCol1 := max(len(blockHeight), len(transactions), len(difficulty), len(size), len(gasUsed), len(baseFee), len(hash), len(stateRoot)) + + blockHeight = fmt.Sprintf("%-*s", maxWidthCol1, blockHeight) + transactions = fmt.Sprintf("%-*s", maxWidthCol1, transactions) + difficulty = fmt.Sprintf("%-*s", maxWidthCol1, difficulty) + size = fmt.Sprintf("%-*s", maxWidthCol1, size) + gasUsed = fmt.Sprintf("%-*s", maxWidthCol1, gasUsed) + baseFee = fmt.Sprintf("%-*s", maxWidthCol1, baseFee) + hash = fmt.Sprintf("%-*s", maxWidthCol1, hash) + stateRoot = fmt.Sprintf("%-*s", maxWidthCol1, stateRoot) + + lines := []string{ + fmt.Sprintf("%s %s", blockHeight, timestamp), + fmt.Sprintf("%s %s", transactions, authorInfo), + fmt.Sprintf("%s %s", difficulty, uncles), + fmt.Sprintf("%s %s", size, gasLimit), + fmt.Sprintf("%s %s", gasUsed, extraData), + fmt.Sprintf("%s %s", baseFee, parentHash), + fmt.Sprintf("%s %s", hash, uncleHash), + fmt.Sprintf("%s %s", stateRoot, txHash), + nonce, } + + return lines } func GetBlockTxTable(block rpctypes.PolyBlock, chainID *big.Int) [][]string { From ea34fe409a9ed27f2a82404647b962045f8ade69 Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Fri, 5 Jan 2024 08:33:29 -0500 Subject: [PATCH 07/12] Update cmd/monitor/monitor.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit leo changes Co-authored-by: Léo Vincent <28714795+leovct@users.noreply.github.com> --- cmd/monitor/monitor.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index a1853284..f77d88bd 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -329,7 +329,11 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et return rpc.BatchCallContext(ctx, subBatch) } if err := backoff.Retry(retryable, b); err != nil { - errChan <- err + select { + case errChan <- err: + default: + log.Error().Msg("Discarding error since error channel is full") + } } for _, elem := range subBatch { From f8499922e49988b5bd5cf9d37f16dd0884c8c44e Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Fri, 5 Jan 2024 08:34:49 -0500 Subject: [PATCH 08/12] final changes to monitor --- cmd/monitor/monitor.go | 10 +++++---- cmd/monitor/ui/ui.go | 51 +++++++++++++++++++++++++++++++++--------- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index f77d88bd..cae16727 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -52,6 +52,9 @@ var ( // semaphore is a channel used to control the concurrency of block data fetch operations. semaphore = make(chan struct{}, maxConcurrency) + + // size of the sub batches to divide and conquer the total batch size with + subBatchSize = 50 ) type ( @@ -305,7 +308,6 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, to *big.Int, rpc *et } func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error { - subBatchSize := 50 var wg sync.WaitGroup errChan := make(chan error, maxConcurrency) @@ -316,6 +318,7 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et defer func() { <-semaphore wg.Done() + close(errChan) }() end := i + subBatchSize if end > len(blms) { @@ -352,8 +355,6 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et wg.Wait() - close(errChan) - var batchErr error for err := range errChan { if batchErr == nil { @@ -465,7 +466,8 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu ms.BlocksLock.RUnlock() renderedBlocks = renderedBlocksTemp - skeleton.Current.Text = ui.GetCurrentBlockInfo(ms.HeadBlock, ms.GasPrice, ms.PeerCount, ms.PendingCount, ms.ChainID, renderedBlocks) + log.Warn().Int("skeleton.Current.Inner.Dy()", skeleton.Current.Inner.Dy()).Int("skeleton.Current.Inner.Dx()", skeleton.Current.Inner.Dx()).Msg("the dimension of the current box") + skeleton.Current.Text = ui.GetCurrentBlockInfo(ms.HeadBlock, ms.GasPrice, ms.PeerCount, ms.PendingCount, ms.ChainID, renderedBlocks, skeleton.Current.Inner.Dx(), skeleton.Current.Inner.Dy()) skeleton.TxPerBlockChart.Data = metrics.GetTxsPerBlock(renderedBlocks) skeleton.GasPriceChart.Data = metrics.GetMeanGasPricePerBlock(renderedBlocks) skeleton.BlockSizeChart.Data = metrics.GetSizePerBlock(renderedBlocks) diff --git a/cmd/monitor/ui/ui.go b/cmd/monitor/ui/ui.go index f6b7dff9..cbab0c6a 100644 --- a/cmd/monitor/ui/ui.go +++ b/cmd/monitor/ui/ui.go @@ -31,26 +31,54 @@ type UiSkeleton struct { Receipts *widgets.List } -func GetCurrentBlockInfo(headBlock *big.Int, gasPrice *big.Int, peerCount uint64, pendingCount uint64, chainID *big.Int, blocks []rpctypes.PolyBlock) string { - // Calculate the formatted strings +func GetCurrentBlockInfo(headBlock *big.Int, gasPrice *big.Int, peerCount uint64, pendingCount uint64, chainID *big.Int, blocks []rpctypes.PolyBlock, dx int, dy int) string { + // Return an appropriate message if dy is 0 or less. + if dy <= 0 { + return "Invalid display configuration." + } + height := fmt.Sprintf("Height: %s", headBlock.String()) - time := fmt.Sprintf("Time: %s", time.Now().Format("02 Jan 06 15:04:05 MST")) + timeInfo := fmt.Sprintf("Time: %s", time.Now().Format("02 Jan 06 15:04:05 MST")) gasPriceString := fmt.Sprintf("Gas Price: %s gwei", new(big.Int).Div(gasPrice, metrics.UnitShannon).String()) peers := fmt.Sprintf("Peers: %d", peerCount) pendingTx := fmt.Sprintf("Pending Tx: %d", pendingCount) chainIdString := fmt.Sprintf("Chain ID: %s", chainID.String()) - maxWidthCol1 := max(len(height), len(gasPriceString), len(pendingTx)) + info := []string{height, timeInfo, gasPriceString, peers, pendingTx, chainIdString} + columns := len(info) / dy + if len(info)%dy != 0 { + columns += 1 // Add an extra column for the remaining items + } - height = fmt.Sprintf("%-*s", maxWidthCol1, height) - gasPriceString = fmt.Sprintf("%-*s", maxWidthCol1, gasPriceString) - pendingTx = fmt.Sprintf("%-*s", maxWidthCol1, pendingTx) + // Calculate the width of each column based on the longest string in each column + columnWidths := make([]int, columns) + for i := 0; i < columns; i++ { + for j := 0; j < dy; j++ { + index := i*dy + j + if index < len(info) && len(info[index]) > columnWidths[i] { + columnWidths[i] = len(info[index]) + } + } + // Add padding and ensure it doesn't exceed 'dx' + columnWidths[i] += 5 // Adjust padding as needed + if columnWidths[i] > dx { + columnWidths[i] = dx + } + } - line1 := fmt.Sprintf("%s %s", height, time) - line2 := fmt.Sprintf("%s %s", gasPriceString, peers) - line3 := fmt.Sprintf("%s %s", pendingTx, chainIdString) + var formattedInfo strings.Builder + for i := 0; i < dy; i++ { + for j := 0; j < columns; j++ { + index := j*dy + i + if index < len(info) { + formatString := fmt.Sprintf("%%-%ds", columnWidths[j]) + formattedInfo.WriteString(fmt.Sprintf(formatString, info[index])) + } + } + formattedInfo.WriteString("\n") + } - return fmt.Sprintf("%s\n%s\n%s", line1, line2, line3) + return formattedInfo.String() } func max(nums ...int) int { @@ -364,6 +392,7 @@ func SetUISkeleton() (blockList *widgets.List, blockInfo *widgets.List, transact blockInfo = widgets.NewList() blockInfo.TextStyle = ui.NewStyle(ui.ColorWhite) blockInfo.Title = "Block Information" + blockInfo.WrapText = true transactionInfo = widgets.NewTable() transactionInfo.TextStyle = ui.NewStyle(ui.ColorWhite) From a4b7a45fe88f949f38889730a19ff9948dc0d22c Mon Sep 17 00:00:00 2001 From: John Hilliard Date: Tue, 9 Jan 2024 17:23:53 -0500 Subject: [PATCH 09/12] feat: adding an auth token option for monitor --- cmd/monitor/cmd.go | 3 +++ cmd/monitor/monitor.go | 19 ++++++++++++++++++- doc/polycli_monitor.md | 1 + 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/cmd/monitor/cmd.go b/cmd/monitor/cmd.go index fcc1e5a7..1896b177 100644 --- a/cmd/monitor/cmd.go +++ b/cmd/monitor/cmd.go @@ -20,6 +20,7 @@ var ( batchSizeValue string blockCacheLimit int intervalStr string + authToken string defaultBatchSize = 100 ) @@ -79,6 +80,8 @@ func init() { MonitorCmd.PersistentFlags().StringVarP(&batchSizeValue, "batch-size", "b", "auto", "Number of requests per batch") MonitorCmd.PersistentFlags().IntVarP(&blockCacheLimit, "cache-limit", "c", 200, "Number of cached blocks for the LRU block data structure (Min 100)") MonitorCmd.PersistentFlags().StringVarP(&intervalStr, "interval", "i", "5s", "Amount of time between batch block rpc calls") + MonitorCmd.PersistentFlags().StringVarP(&authToken, "auth-token", "a", "", "An auth token to be used while making HTTP requests") + } func checkFlags() (err error) { diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index 942687cb..a2302bee 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "net/http" "sync" "time" @@ -73,9 +74,25 @@ const ( monitorModeTransaction ) +func addAuthToken(h http.Header) error { + if authToken == "" { + return fmt.Errorf("unable to add empty auth token") + } + + h.Set("Authorization", fmt.Sprintf("Bearer %s", authToken)) + return nil +} + func monitor(ctx context.Context) error { // Dial rpc. - rpc, err := ethrpc.DialContext(ctx, rpcUrl) + var rpc *ethrpc.Client + var err error + if authToken == "" { + rpc, err = ethrpc.DialContext(ctx, rpcUrl) + } else { + rpc, err = ethrpc.DialOptions(ctx, rpcUrl, ethrpc.WithHTTPAuth(addAuthToken)) + } + if err != nil { log.Error().Err(err).Msg("Unable to dial rpc") return err diff --git a/doc/polycli_monitor.md b/doc/polycli_monitor.md index 7b7ba143..980341ae 100644 --- a/doc/polycli_monitor.md +++ b/doc/polycli_monitor.md @@ -28,6 +28,7 @@ If you're experiencing missing blocks, try adjusting the `--batch-size` and `--i ## Flags ```bash + -a, --auth-token string An auth token to be used while making HTTP requests -b, --batch-size string Number of requests per batch (default "auto") -c, --cache-limit int Number of cached blocks for the LRU block data structure (Min 100) (default 200) -h, --help help for monitor From 1d73a5154e817613ba735039f33fd82e58808f20 Mon Sep 17 00:00:00 2001 From: John Hilliard Date: Tue, 9 Jan 2024 19:57:54 -0500 Subject: [PATCH 10/12] feat: switching to generic headers --- cmd/monitor/cmd.go | 25 +++++++++++++++++++------ cmd/monitor/monitor.go | 12 +++++------- doc/polycli_monitor.md | 2 +- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/cmd/monitor/cmd.go b/cmd/monitor/cmd.go index 1896b177..edfd0146 100644 --- a/cmd/monitor/cmd.go +++ b/cmd/monitor/cmd.go @@ -4,6 +4,7 @@ import ( _ "embed" "fmt" "strconv" + "strings" "sync" "time" @@ -16,11 +17,12 @@ var ( usage string // flags - rpcUrl string - batchSizeValue string - blockCacheLimit int - intervalStr string - authToken string + rpcUrl string + batchSizeValue string + blockCacheLimit int + intervalStr string + rawHttpHeaders []string + parsedHttpHeaders map[string]string defaultBatchSize = 100 ) @@ -80,7 +82,7 @@ func init() { MonitorCmd.PersistentFlags().StringVarP(&batchSizeValue, "batch-size", "b", "auto", "Number of requests per batch") MonitorCmd.PersistentFlags().IntVarP(&blockCacheLimit, "cache-limit", "c", 200, "Number of cached blocks for the LRU block data structure (Min 100)") MonitorCmd.PersistentFlags().StringVarP(&intervalStr, "interval", "i", "5s", "Amount of time between batch block rpc calls") - MonitorCmd.PersistentFlags().StringVarP(&authToken, "auth-token", "a", "", "An auth token to be used while making HTTP requests") + MonitorCmd.PersistentFlags().StringSliceVarP(&rawHttpHeaders, "header", "H", nil, "Header to be added to each HTTP request. E.g. \"X-First-Name: Joe\"") } @@ -89,6 +91,17 @@ func checkFlags() (err error) { return } + if rawHttpHeaders != nil { + parsedHttpHeaders = make(map[string]string, len(rawHttpHeaders)) + for _, rh := range rawHttpHeaders { + pieces := strings.SplitN(rh, ":", 2) + if len(pieces) != 2 { + return fmt.Errorf("the header value should have been split into 2 pieces, but got %d", len(pieces)) + } + parsedHttpHeaders[pieces[0]] = pieces[1] + } + } + interval, err = time.ParseDuration(intervalStr) if err != nil { return err diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index a2302bee..17348843 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -74,12 +74,10 @@ const ( monitorModeTransaction ) -func addAuthToken(h http.Header) error { - if authToken == "" { - return fmt.Errorf("unable to add empty auth token") +func addHttpHeaders(h http.Header) error { + for k, v := range parsedHttpHeaders { + h.Set(k, v) } - - h.Set("Authorization", fmt.Sprintf("Bearer %s", authToken)) return nil } @@ -87,10 +85,10 @@ func monitor(ctx context.Context) error { // Dial rpc. var rpc *ethrpc.Client var err error - if authToken == "" { + if parsedHttpHeaders == nil { rpc, err = ethrpc.DialContext(ctx, rpcUrl) } else { - rpc, err = ethrpc.DialOptions(ctx, rpcUrl, ethrpc.WithHTTPAuth(addAuthToken)) + rpc, err = ethrpc.DialOptions(ctx, rpcUrl, ethrpc.WithHTTPAuth(addHttpHeaders)) } if err != nil { diff --git a/doc/polycli_monitor.md b/doc/polycli_monitor.md index 980341ae..0b0939dd 100644 --- a/doc/polycli_monitor.md +++ b/doc/polycli_monitor.md @@ -28,9 +28,9 @@ If you're experiencing missing blocks, try adjusting the `--batch-size` and `--i ## Flags ```bash - -a, --auth-token string An auth token to be used while making HTTP requests -b, --batch-size string Number of requests per batch (default "auto") -c, --cache-limit int Number of cached blocks for the LRU block data structure (Min 100) (default 200) + -H, --header strings Header to be added to each HTTP request. E.g. "X-First-Name: Joe" -h, --help help for monitor -i, --interval string Amount of time between batch block rpc calls (default "5s") -r, --rpc-url string The RPC endpoint url (default "http://localhost:8545") From e3bf5513cceaf4b1418be7b99017db7e4ea7f2cb Mon Sep 17 00:00:00 2001 From: John Hilliard Date: Tue, 9 Jan 2024 20:32:59 -0500 Subject: [PATCH 11/12] fix: https://github.com/maticnetwork/polygon-cli/pull/181#pullrequestreview-1808576970 --- cmd/monitor/monitor.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index cae16727..3b8c700a 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -309,7 +309,8 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, to *big.Int, rpc *et func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error { var wg sync.WaitGroup - errChan := make(chan error, maxConcurrency) + var errs []error = make([]error, 0) + var errorsMutex sync.Mutex for i := 0; i < len(blms); i += subBatchSize { semaphore <- struct{}{} @@ -318,7 +319,6 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et defer func() { <-semaphore wg.Done() - close(errChan) }() end := i + subBatchSize if end > len(blms) { @@ -332,11 +332,11 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et return rpc.BatchCallContext(ctx, subBatch) } if err := backoff.Retry(retryable, b); err != nil { - select { - case errChan <- err: - default: - log.Error().Msg("Discarding error since error channel is full") - } + log.Error().Err(err).Msg("unable to retry") + errorsMutex.Lock() + errs = append(errs, err) + errorsMutex.Unlock() + return } for _, elem := range subBatch { @@ -355,13 +355,7 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et wg.Wait() - var batchErr error - for err := range errChan { - if batchErr == nil { - batchErr = err - } - } - return batchErr + return errors.Join(errs...) } func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client) error { @@ -380,6 +374,8 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu grid.SetRect(0, 0, termWidth, termHeight) blockGrid.SetRect(0, 0, termWidth, termHeight) transactionGrid.SetRect(0, 0, termWidth, termHeight) + // Initial render needed I assume to avoid the first bad redraw + termui.Render(grid) var setBlock = false var renderedBlocks rpctypes.SortableBlocks From 7e8d80a2a6999ec990dde7ab7b63e2fcb2899c35 Mon Sep 17 00:00:00 2001 From: John Hilliard Date: Tue, 9 Jan 2024 21:25:52 -0500 Subject: [PATCH 12/12] feat: adding header option to loadtest --- cmd/loadtest/app.go | 11 +++++++++++ cmd/loadtest/loadtest.go | 10 +++++++++- cmd/monitor/cmd.go | 11 +++-------- cmd/monitor/monitor.go | 10 +--------- doc/polycli_loadtest.md | 1 + util/util.go | 21 +++++++++++++++++++++ 6 files changed, 46 insertions(+), 18 deletions(-) diff --git a/cmd/loadtest/app.go b/cmd/loadtest/app.go index 600c187c..e38056c6 100644 --- a/cmd/loadtest/app.go +++ b/cmd/loadtest/app.go @@ -78,6 +78,7 @@ type ( ContractCallFunctionArgs *[]string ContractCallPayable *bool InscriptionContent *string + RawHTTPHeaders *[]string // Computed CurrentGasPrice *big.Int @@ -93,6 +94,7 @@ type ( Mode loadTestMode ParsedModes []loadTestMode MultiMode bool + ParsedHTTPHeaders map[string]string } ) @@ -190,6 +192,14 @@ func checkLoadtestFlags() error { return errors.New("the backoff factor needs to be non-zero positive") } + if ltp.RawHTTPHeaders != nil { + var err error + inputLoadTestParams.ParsedHTTPHeaders, err = util.ParseHeaderStrings(*ltp.RawHTTPHeaders) + if err != nil { + return err + } + } + return nil } @@ -260,6 +270,7 @@ inscription - sending inscription transactions`) ltp.ContractCallFunctionArgs = LoadtestCmd.Flags().StringSlice("function-arg", []string{}, `The arguments that will be passed to a contract function call. This must be paired up with "--mode contract-call" and "--contract-address". Args can be passed multiple times: "--function-arg 'test' --function-arg 999" or comma separated values "--function-arg "test",9". The ordering of the arguments must match the ordering of the function parameters.`) ltp.ContractCallPayable = LoadtestCmd.Flags().Bool("contract-call-payable", false, "Use this flag if the function is payable, the value amount passed will be from --eth-amount. This must be paired up with --mode contract-call and --contract-address") ltp.InscriptionContent = LoadtestCmd.Flags().String("inscription-content", `data:,{"p":"erc-20","op":"mint","tick":"TEST","amt":"1"}`, "The inscription content that will be encoded as calldata. This must be paired up with --mode inscription") + ltp.RawHTTPHeaders = LoadtestCmd.Flags().StringSliceP("header", "H", nil, "Header to be added to each HTTP request. E.g. \"X-First-Name: Joe\"") inputLoadTestParams = *ltp diff --git a/cmd/loadtest/loadtest.go b/cmd/loadtest/loadtest.go index 077f9ce6..273f7a51 100644 --- a/cmd/loadtest/loadtest.go +++ b/cmd/loadtest/loadtest.go @@ -347,7 +347,15 @@ func runLoadTest(ctx context.Context) error { } // Dial the Ethereum RPC server. - rpc, err := ethrpc.DialContext(ctx, *inputLoadTestParams.RPCUrl) + var rpc *ethrpc.Client + var err error + if inputLoadTestParams.ParsedHTTPHeaders == nil { + log.Trace().Msg("No HeadersAdding custom headers") + rpc, err = ethrpc.DialContext(ctx, *inputLoadTestParams.RPCUrl) + } else { + log.Trace().Msg("Adding custom headers") + rpc, err = ethrpc.DialOptions(ctx, *inputLoadTestParams.RPCUrl, ethrpc.WithHTTPAuth(util.GetHTTPAuth(inputLoadTestParams.ParsedHTTPHeaders))) + } if err != nil { log.Error().Err(err).Msg("Unable to dial rpc") return err diff --git a/cmd/monitor/cmd.go b/cmd/monitor/cmd.go index edfd0146..7078d868 100644 --- a/cmd/monitor/cmd.go +++ b/cmd/monitor/cmd.go @@ -4,7 +4,6 @@ import ( _ "embed" "fmt" "strconv" - "strings" "sync" "time" @@ -92,13 +91,9 @@ func checkFlags() (err error) { } if rawHttpHeaders != nil { - parsedHttpHeaders = make(map[string]string, len(rawHttpHeaders)) - for _, rh := range rawHttpHeaders { - pieces := strings.SplitN(rh, ":", 2) - if len(pieces) != 2 { - return fmt.Errorf("the header value should have been split into 2 pieces, but got %d", len(pieces)) - } - parsedHttpHeaders[pieces[0]] = pieces[1] + parsedHttpHeaders, err = util.ParseHeaderStrings(rawHttpHeaders) + if err != nil { + return err } } diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index 34aabf2d..9618ae44 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math/big" - "net/http" "sync" "time" @@ -95,13 +94,6 @@ const ( monitorModeTransaction ) -func addHttpHeaders(h http.Header) error { - for k, v := range parsedHttpHeaders { - h.Set(k, v) - } - return nil -} - func monitor(ctx context.Context) error { // Dial rpc. var rpc *ethrpc.Client @@ -109,7 +101,7 @@ func monitor(ctx context.Context) error { if parsedHttpHeaders == nil { rpc, err = ethrpc.DialContext(ctx, rpcUrl) } else { - rpc, err = ethrpc.DialOptions(ctx, rpcUrl, ethrpc.WithHTTPAuth(addHttpHeaders)) + rpc, err = ethrpc.DialOptions(ctx, rpcUrl, ethrpc.WithHTTPAuth(util.GetHTTPAuth(parsedHttpHeaders))) } if err != nil { diff --git a/doc/polycli_loadtest.md b/doc/polycli_loadtest.md index f337c20d..0f4d09a6 100644 --- a/doc/polycli_loadtest.md +++ b/doc/polycli_loadtest.md @@ -122,6 +122,7 @@ The codebase has a contract that used for load testing. It's written in Yul and --function-signature string The contract's function signature that will be called. The format is '()'. This must be paired up with '--mode contract-call' and '--contract-address'. If the function requires parameters you can pass them with '--function-arg '. --gas-limit uint In environments where the gas limit can't be computed on the fly, we can specify it manually. This can also be used to avoid eth_estimateGas --gas-price uint In environments where the gas price can't be determined automatically, we can specify it manually + -H, --header strings Header to be added to each HTTP request. E.g. "X-First-Name: Joe" -h, --help help for loadtest --inscription-content string The inscription content that will be encoded as calldata. This must be paired up with --mode inscription (default "data:,{\"p\":\"erc-20\",\"op\":\"mint\",\"tick\":\"TEST\",\"amt\":\"1\"}") -i, --iterations uint If we're making contract calls, this controls how many times the contract will execute the instruction in a loop. If we are making ERC721 Mints, this indicates the minting batch size (default 1) diff --git a/util/util.go b/util/util.go index 2116a6bc..7dbdcf09 100644 --- a/util/util.go +++ b/util/util.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "strconv" "strings" "time" @@ -216,3 +217,23 @@ func BlockUntilSuccessful(ctx context.Context, c *ethclient.Client, retryable fu b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(5*time.Second), 24), ctx) return backoff.Retry(retryable, b) } +func GetHTTPAuth(parsedHttpHeaders map[string]string) func(http.Header) error { + return func(h http.Header) error { + for k, v := range parsedHttpHeaders { + h.Set(k, v) + } + return nil + } +} + +func ParseHeaderStrings(rawHTTPHeaders []string) (map[string]string, error) { + parsedHttpHeaders := make(map[string]string, len(rawHTTPHeaders)) + for _, rh := range rawHTTPHeaders { + pieces := strings.SplitN(rh, ":", 2) + if len(pieces) != 2 { + return nil, fmt.Errorf("the header value should have been split into 2 pieces, but got %d", len(pieces)) + } + parsedHttpHeaders[pieces[0]] = pieces[1] + } + return parsedHttpHeaders, nil +}