Skip to content

Commit

Permalink
pls, tx: Added count goroutine related db with sync.WaitGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
Jin committed Apr 17, 2020
1 parent 53a12e1 commit 4838a27
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 23 deletions.
5 changes: 4 additions & 1 deletion pls/rootchain_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type RootChainManager struct {
invalidExits map[uint64]map[uint64]invalidExits

// channels
wg *sync.WaitGroup
quit chan struct{}
epochPreparedCh chan *rootchain.RootChainEpochPrepared
blockFinalizedCh chan *rootchain.RootChainBlockFinalized
Expand Down Expand Up @@ -129,6 +130,7 @@ func NewRootChainManager(
miner: miner,
minerEnv: env,
invalidExits: make(map[uint64]map[uint64]invalidExits),
wg: new(sync.WaitGroup),
quit: make(chan struct{}),
epochPreparedCh: make(chan *rootchain.RootChainEpochPrepared, MAX_EPOCH_EVENTS),
blockFinalizedCh: make(chan *rootchain.RootChainBlockFinalized),
Expand All @@ -154,7 +156,7 @@ func (rcm *RootChainManager) Start() error {
}

go rcm.pingBackend()
rcm.txManager.Start()
rcm.txManager.Start(rcm.wg)

if rcm.config.NodeMode == ModeOperator {
go rcm.miner.Start(rcm.config.Operator.Address, new(rootchain.RootChainEpochPrepared), true)
Expand All @@ -166,6 +168,7 @@ func (rcm *RootChainManager) Start() error {
func (rcm *RootChainManager) Stop() error {
rcm.backend.Close()
rcm.txManager.Stop()
rcm.wg.Wait()
close(rcm.quit)
return nil
}
Expand Down
27 changes: 16 additions & 11 deletions tx/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (tm *TransactionManager) Add(account accounts.Account, raw *RawTransaction,

// enqueue raw transaction
tm.pending[addr] = append(tm.pending[addr], raw)
WritePendingTxs(tm.db, addr, tm.pending[addr])
WritePendingTxs(tm.db, tm.quit, addr, tm.pending[addr])

log.Info("Raw transaction added", "caption", raw.getCaption(), "from", raw.From)

Expand Down Expand Up @@ -256,8 +256,8 @@ func (tm *TransactionManager) Count(account accounts.Account, tx *types.Transact
return count
}

func (tm *TransactionManager) Start() {
go tm.confirmLoop()
func (tm *TransactionManager) Start(wg *sync.WaitGroup) {
go tm.confirmLoop(wg)

// send a single raw transaction to root chain.
// TODO: make it safe under root chain provider disconnect
Expand Down Expand Up @@ -335,7 +335,7 @@ func (tm *TransactionManager) Start() {
raw.LastSentBlockNumber = blockNumber

tm.lock.Lock()
WritePendingTxs(tm.db, addr, tm.pending[addr])
WritePendingTxs(tm.db, tm.quit, addr, tm.pending[addr])
tm.lock.Unlock()

err = tm.backend.SendTransaction(context.Background(), signedTx)
Expand Down Expand Up @@ -421,7 +421,9 @@ func (tm *TransactionManager) Start() {
}

for addr, _ := range tm.pending {
wg.Add(1)
go func(addr common.Address) {
defer wg.Done()
log.Trace("TransactionManager iterates", "addr", addr)
queue := tm.pending[addr]

Expand Down Expand Up @@ -616,8 +618,8 @@ func (tm *TransactionManager) clearQueue(addr common.Address) {
if l != 0 {
tm.unconfirmed[addr] = append(tm.unconfirmed[addr], minedRaws...)
tm.pending[addr] = tm.pending[addr][l:]
WritePendingTxs(tm.db, addr, tm.pending[addr])
WriteUnconfirmedTxs(tm.db, addr, tm.unconfirmed[addr])
WritePendingTxs(tm.db, tm.quit, addr, tm.pending[addr])
WriteUnconfirmedTxs(tm.db, tm.quit, addr, tm.unconfirmed[addr])
}
}

Expand Down Expand Up @@ -662,8 +664,8 @@ func (tm *TransactionManager) confirmQueue(addr common.Address) {
tm.unconfirmed[addr] = newUnconfirmed
sort.Sort(RawTransactionsByIndex(tm.pending[addr]))

WriteUnconfirmedTxs(tm.db, addr, tm.unconfirmed[addr])
WritePendingTxs(tm.db, addr, tm.pending[addr])
WriteUnconfirmedTxs(tm.db, tm.quit, addr, tm.unconfirmed[addr])
WritePendingTxs(tm.db, tm.quit, addr, tm.pending[addr])
}

// remove already confirmed raw transactions
Expand All @@ -688,8 +690,8 @@ func (tm *TransactionManager) confirmQueue(addr common.Address) {
// update database
if i != 0 {
tm.unconfirmed[addr] = tm.unconfirmed[addr][i:]
WriteNumConfirmedRawTxs(tm.db, addr, numConfirmed)
WriteUnconfirmedTxs(tm.db, addr, tm.unconfirmed[addr])
WriteNumConfirmedRawTxs(tm.db, tm.quit, addr, numConfirmed)
WriteUnconfirmedTxs(tm.db, tm.quit, addr, tm.unconfirmed[addr])
}
}

Expand All @@ -705,7 +707,10 @@ func (tm *TransactionManager) indexOf(addr common.Address) int {
}

// TODO: use SubscribeNewHead with disconnection handling
func (tm *TransactionManager) confirmLoop() {
func (tm *TransactionManager) confirmLoop(wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()

closed := false

newHeaderCh := make(chan *types.Header)
Expand Down
16 changes: 10 additions & 6 deletions tx/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,11 @@ func makeTestManager(db ethdb.Database) *TransactionManager {
}

func TestBasic(t *testing.T) {
var wg sync.WaitGroup
db := rawdb.NewMemoryDatabase()
tm := makeTestManager(db)

tm.Start()
tm.Start(&wg)

// addrs[0] send 1 ETH to addrs[1] n1 times
n1 := 10
Expand Down Expand Up @@ -173,17 +174,19 @@ func TestBasic(t *testing.T) {
}

func TestRestart(t *testing.T) {
var wg sync.WaitGroup

db := rawdb.NewMemoryDatabase()
tm := makeTestManager(db)
tm.Start()
tm.Start(&wg)

// addrs[0] sends n1 transactions
n1 := 10
nonce1, _ := backend.NonceAt(context.Background(), addrs[0], nil)

for i := 0; i < n1; i++ {
rawTx := NewRawTransaction(addrs[0], 21000, &addrs[1], big.NewInt(int64(1e18+i)), []byte{}, false, fmt.Sprintf("raw tx %d", i))
if err := tm.Add(accs[0], rawTx, false); err != nil {
if err := tm.Add(accs[0], rawTx,false); err != nil {
t.Fatalf("Failed to add rawTx: %v", err)
}
log.Debug(fmt.Sprintf("raw tx %d added", i))
Expand All @@ -198,7 +201,7 @@ func TestRestart(t *testing.T) {

<-time.NewTimer(5 * time.Second).C
tm = makeTestManager(db)
tm.Start()
tm.Start(&wg)
log.Info("TranasctionManager restarted")

// addrs[0] sends n2 transactions
Expand Down Expand Up @@ -229,13 +232,14 @@ func TestRestart(t *testing.T) {
}

func TestCongestedNetwork(t *testing.T) {
var wg sync.WaitGroup
db := rawdb.NewMemoryDatabase()
tm := makeTestManager(db)

tm.Start()
tm.Start(&wg)

// addrs[0] send n1 transactions
n1 := 10
n1 := 3
nonce1, _ := backend.NonceAt(context.Background(), addrs[0], nil)

// addrs[1, ..., 8] send lots of tx to congest network
Expand Down
15 changes: 10 additions & 5 deletions tx/rawdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,18 @@ func ReadNumConfirmedRawTxs(db ethdb.Reader, addr common.Address) uint64 {
return n
}

func WriteNumConfirmedRawTxs(db ethdb.KeyValueWriter, addr common.Address, n uint64) {
func WriteNumConfirmedRawTxs(db ethdb.KeyValueWriter, quit chan struct{}, addr common.Address, n uint64) {
data, err := rlp.EncodeToBytes(n)
if err != nil {
log.Crit("Failed to encode number of raw transactions", "err", err)
}
if err := db.Put(numConfirmedRawTxsKey(addr), data); err != nil {
log.Crit("Failed to store number of raw transactions", "err", err)
select {
case <-quit:
return
default:
if err := db.Put(unconfirmedTxsKey(addr), data); err != nil {
log.Crit("Failed to store unconfirmed transactions", "err", err)
}
}
}

Expand Down Expand Up @@ -258,7 +263,7 @@ func ReadUnconfirmedTxs(db ethdb.Reader, addr common.Address) RawTransactions {
return txs
}

func WriteUnconfirmedTxs(db ethdb.KeyValueWriter, addr common.Address, txs RawTransactions) {
func WriteUnconfirmedTxs(db ethdb.KeyValueWriter, quit chan struct{}, addr common.Address, txs RawTransactions) {
data, err := rlp.EncodeToBytes(txs)
if err != nil {
log.Crit("Failed to encode unconfirmed transactions", "err", err)
Expand Down Expand Up @@ -288,7 +293,7 @@ func ReadPendingTxs(db ethdb.Reader, addr common.Address) RawTransactions {
return txs
}

func WritePendingTxs(db ethdb.KeyValueWriter, addr common.Address, txs RawTransactions) {
func WritePendingTxs(db ethdb.KeyValueWriter, quit chan struct{}, addr common.Address, txs RawTransactions) {
data, err := rlp.EncodeToBytes(txs)
if err != nil {
log.Crit("Failed to encode pending transactions", "err", err)
Expand Down

0 comments on commit 4838a27

Please sign in to comment.