Skip to content

Commit

Permalink
Sketch parallelization (not tested).
Browse files Browse the repository at this point in the history
  • Loading branch information
andreibancioiu committed Oct 31, 2024
1 parent d1063be commit 0d96e18
Showing 1 changed file with 52 additions and 6 deletions.
58 changes: 52 additions & 6 deletions txcache/selectionUsingMerges.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package txcache

import "sync"

type BunchOfTransactions []*WrappedTransaction

const numJobsForMerging = 4

type mergingJob struct {
input []BunchOfTransactions
output BunchOfTransactions
}

func (cache *TxCache) selectTransactionsUsingMerges(gasRequested uint64) BunchOfTransactions {
senders := cache.getSenders()
bunches := make([]BunchOfTransactions, 0, len(senders))
Expand All @@ -10,12 +19,7 @@ func (cache *TxCache) selectTransactionsUsingMerges(gasRequested uint64) BunchOf
bunches = append(bunches, sender.getTxsWithoutGaps())
}

// If number of bunches is odd, add a phony bunch (to ease pairing logic).
if len(bunches)%2 == 1 {
bunches = append(bunches, make(BunchOfTransactions, 0))
}

mergedBunch := mergeBunchesOfTransactions(bunches)[0]
mergedBunch := mergeBunchesOfTransactionsInParallel(bunches)
selection := selectUntilReachedGasRequested(mergedBunch, gasRequested)
return selection
}
Expand All @@ -34,6 +38,48 @@ func selectUntilReachedGasRequested(bunch BunchOfTransactions, gasRequested uint
return bunch
}

func mergeBunchesOfTransactionsInParallel(bunches []BunchOfTransactions) BunchOfTransactions {
// If number of bunches is odd, add a phony bunch (to ease pairing logic).
if len(bunches)%2 == 1 {
bunches = append(bunches, make(BunchOfTransactions, 0))
}

jobs := make([]*mergingJob, numJobsForMerging)

for i := 0; i < numJobsForMerging; i++ {
jobs[i] = &mergingJob{
input: make([]BunchOfTransactions, 0, len(bunches)/numJobsForMerging),
}
}

for i, bunch := range bunches {
jobs[i%numJobsForMerging].input = append(jobs[i%numJobsForMerging].input, bunch)
}

// Run jobs in parallel
wg := sync.WaitGroup{}

for _, job := range jobs {
wg.Add(1)

go func(job *mergingJob) {
job.output = mergeBunchesOfTransactions(job.input)[0]
defer wg.Done()
}(job)
}

wg.Wait()

// Merge the results of the jobs
outputBunchesOfJobs := make([]BunchOfTransactions, 0, numJobsForMerging)

for _, job := range jobs {
outputBunchesOfJobs = append(outputBunchesOfJobs, job.output)
}

return mergeBunchesOfTransactions(outputBunchesOfJobs)[0]
}

func mergeBunchesOfTransactions(bunches []BunchOfTransactions) []BunchOfTransactions {
if len(bunches) == 1 {
return bunches
Expand Down

0 comments on commit 0d96e18

Please sign in to comment.