Skip to content

Commit

Permalink
Optimize tx sync
Browse files Browse the repository at this point in the history
  • Loading branch information
wincenteam committed Nov 25, 2021
1 parent d8331a3 commit 56c725e
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 35 deletions.
1 change: 1 addition & 0 deletions cmd/gero/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ var (
utils.ConfirmedBlockFlag,
utils.RecordBlockShareNumber,
utils.LightNodeFlag,
utils.CloseAcceptTx,
utils.ResetBlockNumber,

utils.DeveloperFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ var (
Usage: "start light node",
}

CloseAcceptTx = cli.BoolFlag{
Name: "closeAcceptTx",
Usage: "Close Accept from remote Tx",
}

ConfirmedBlockFlag = cli.Uint64Flag{
Name: "confirmedBlock",
Usage: "The balance will be confirmed after the current block of number,default is 12",
Expand Down Expand Up @@ -1289,6 +1294,10 @@ func SetSeroConfig(ctx *cli.Context, stack *node.Node, cfg *sero.Config) {
cfg.StartLight = true
}

if ctx.GlobalIsSet(CloseAcceptTx.Name) {
cfg.CloseAcceptTx = true
}

// Override any default configs for hard coded networks.
switch {
case ctx.GlobalBool(AlphanetFlag.Name):
Expand Down
2 changes: 1 addition & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ LOOP:
err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool)
switch err {
case core.ErrGasLimitReached:
log.Info("Gas limit exceeded for current block", "block", bc.CurrentBlock().Header().Number.Uint64(), "txHash", tx.Hash().Hex())
log.Info("Gas limit exceeded for current block", "block", bc.CurrentBlock().Header().Number.Uint64())
// Pop the current out-of-gas transaction without shifting in the next from the account
//log.Trace("Gas limit exceeded for current block", "sender", tx.From())
txs.Pop()
Expand Down
2 changes: 1 addition & 1 deletion sero/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Sero, error) {

sero.miner = miner.New(sero, sero.chainConfig, sero.EventMux(), sero.voter, sero.engine)

if sero.protocolManager, err = NewProtocolManager(sero.chainConfig, config.SyncMode, config.NetworkId, sero.eventMux, sero.voter, sero.txPool, sero.miner, sero.engine, sero.blockchain, chainDb); err != nil {
if sero.protocolManager, err = NewProtocolManager(sero.chainConfig, config.CloseAcceptTx, config.SyncMode, config.NetworkId, sero.eventMux, sero.voter, sero.txPool, sero.miner, sero.engine, sero.blockchain, chainDb); err != nil {
return nil, err
}
sero.miner.SetExtra(makeExtraData(config.ExtraData))
Expand Down
1 change: 1 addition & 0 deletions sero/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Config struct {
SyncMode downloader.SyncMode
NoPruning bool

CloseAcceptTx bool
MineMode bool
StartExchange bool
AutoMerge bool
Expand Down
77 changes: 45 additions & 32 deletions sero/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ func errResp(code errCode, format string, v ...interface{}) error {
type ProtocolManager struct {
networkID uint64

fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)

txpool txPool
miner sero_miner
voter shareVoter
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
closeAcceptTx bool
txpool txPool
miner sero_miner
voter shareVoter
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int

downloader *downloader.Downloader
fetcher *fetcher.Fetcher
Expand Down Expand Up @@ -103,21 +103,22 @@ type ProtocolManager struct {

// NewProtocolManager returns a new Sero sub protocol manager. The Sero sub protocol manages peers capable
// with the Sero network.
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, voter shareVoter, txpool txPool, miner sero_miner, engine consensus.Engine, blockchain *core.BlockChain, chaindb serodb.Database) (*ProtocolManager, error) {
func NewProtocolManager(config *params.ChainConfig, closeAcceptTx bool, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, voter shareVoter, txpool txPool, miner sero_miner, engine consensus.Engine, blockchain *core.BlockChain, chaindb serodb.Database) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
eventMux: mux,
txpool: txpool,
miner: miner,
voter: voter,
blockchain: blockchain,
chainconfig: config,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
networkID: networkID,
eventMux: mux,
closeAcceptTx: closeAcceptTx,
txpool: txpool,
miner: miner,
voter: voter,
blockchain: blockchain,
chainconfig: config,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
Expand Down Expand Up @@ -630,10 +631,20 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}

case msg.Code == TxMsg:
if pm.closeAcceptTx {
break
}
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
break
}
currentBlock := pm.blockchain.CurrentBlock()
difference := time.Now().Unix() - currentBlock.Time().Int64()
if difference > 2*60 {
log.Info("to behind,dont receive remote txs")
break
}

// Transactions can be processed, parse all of them and deliver to the pool
var txs []*types.Transaction
if err := msg.Decode(&txs); err != nil {
Expand All @@ -647,16 +658,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.peers.AddKnowTx(p.id, tx.Hash())
//p.MarkTransaction(tx.Hash())
}
currentBlock := pm.blockchain.CurrentBlock()
difference := time.Now().Unix() - currentBlock.Time().Int64()
if difference < 10*60 {
errs := pm.txpool.AddRemotes(txs)
addedTxs := len(txs) - len(errs)
if addedTxs > 0 {
log.Debug("received from", "remote peer", p.RemoteAddr().String(), "txs", len(txs), "added", addedTxs)
}
} else {
log.Trace("to behind,dont receive remote txs")

errs := pm.txpool.AddRemotes(txs)
addedTxs := len(txs) - len(errs)
if addedTxs > 0 {
log.Debug("received from", "remote peer", p.RemoteAddr().String(), "txs", len(txs), "added", addedTxs)
}

case msg.Code == NewVoteMsg:
Expand Down Expand Up @@ -731,6 +737,13 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
}
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
}

currentBlock := pm.blockchain.CurrentBlock()
difference := time.Now().Unix() - currentBlock.Time().Int64()
if difference > 2*60 {
return
}

// FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for peer, txs := range txset {

Expand Down
2 changes: 1 addition & 1 deletion sero/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (p *peer) SendTransactions(txs types.Transactions) error {
// p.knownTxs.Add(tx.Hash())
//}
if len(txs) > 0 {
subLen := 400
subLen := 200
start := 0
for {
if start >= len(txs) {
Expand Down

0 comments on commit 56c725e

Please sign in to comment.