From 0d96e18985b7633d3232a67a00afb37236834d65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrei=20B=C4=83ncioiu?= Date: Thu, 31 Oct 2024 16:53:22 +0200 Subject: [PATCH] Sketch parallelization (not tested). --- txcache/selectionUsingMerges.go | 58 +++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/txcache/selectionUsingMerges.go b/txcache/selectionUsingMerges.go index 9eaab5e1..57c13245 100644 --- a/txcache/selectionUsingMerges.go +++ b/txcache/selectionUsingMerges.go @@ -1,7 +1,16 @@ package txcache +import "sync" + type BunchOfTransactions []*WrappedTransaction +const numJobsForMerging = 4 + +type mergingJob struct { + input []BunchOfTransactions + output BunchOfTransactions +} + func (cache *TxCache) selectTransactionsUsingMerges(gasRequested uint64) BunchOfTransactions { senders := cache.getSenders() bunches := make([]BunchOfTransactions, 0, len(senders)) @@ -10,12 +19,7 @@ func (cache *TxCache) selectTransactionsUsingMerges(gasRequested uint64) BunchOf bunches = append(bunches, sender.getTxsWithoutGaps()) } - // If number of bunches is odd, add a phony bunch (to ease pairing logic). - if len(bunches)%2 == 1 { - bunches = append(bunches, make(BunchOfTransactions, 0)) - } - - mergedBunch := mergeBunchesOfTransactions(bunches)[0] + mergedBunch := mergeBunchesOfTransactionsInParallel(bunches) selection := selectUntilReachedGasRequested(mergedBunch, gasRequested) return selection } @@ -34,6 +38,48 @@ func selectUntilReachedGasRequested(bunch BunchOfTransactions, gasRequested uint return bunch } +func mergeBunchesOfTransactionsInParallel(bunches []BunchOfTransactions) BunchOfTransactions { + // If number of bunches is odd, add a phony bunch (to ease pairing logic). + if len(bunches)%2 == 1 { + bunches = append(bunches, make(BunchOfTransactions, 0)) + } + + jobs := make([]*mergingJob, numJobsForMerging) + + for i := 0; i < numJobsForMerging; i++ { + jobs[i] = &mergingJob{ + input: make([]BunchOfTransactions, 0, len(bunches)/numJobsForMerging), + } + } + + for i, bunch := range bunches { + jobs[i%numJobsForMerging].input = append(jobs[i%numJobsForMerging].input, bunch) + } + + // Run jobs in parallel + wg := sync.WaitGroup{} + + for _, job := range jobs { + wg.Add(1) + + go func(job *mergingJob) { + job.output = mergeBunchesOfTransactions(job.input)[0] + defer wg.Done() + }(job) + } + + wg.Wait() + + // Merge the results of the jobs + outputBunchesOfJobs := make([]BunchOfTransactions, 0, numJobsForMerging) + + for _, job := range jobs { + outputBunchesOfJobs = append(outputBunchesOfJobs, job.output) + } + + return mergeBunchesOfTransactions(outputBunchesOfJobs)[0] +} + func mergeBunchesOfTransactions(bunches []BunchOfTransactions) []BunchOfTransactions { if len(bunches) == 1 { return bunches