Skip to content

Commit

Permalink
add light node pendingTx query
Browse files Browse the repository at this point in the history
  • Loading branch information
wincenteam committed Jul 29, 2020
1 parent db8b64b commit a8b4c65
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 11 deletions.
229 changes: 229 additions & 0 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ import (
"sync"
"time"

"github.com/sero-cash/go-czero-import/c_superzk"
"github.com/sero-cash/go-sero/zero/txs/stx/stx_v0"

"github.com/sero-cash/go-sero/zero/localdb"

"github.com/sero-cash/go-czero-import/c_type"
"github.com/sero-cash/go-sero/zero/txtool"

"github.com/sero-cash/go-sero/crypto"
"github.com/sero-cash/go-sero/zero/stake"
"github.com/sero-cash/go-sero/zero/txs/stx"
Expand Down Expand Up @@ -113,6 +121,8 @@ type TxPoolConfig struct {
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued

StartLight bool
}

// DefaultTxPoolConfig contains the default configurations for the transaction
Expand Down Expand Up @@ -140,6 +150,169 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
return conf
}

func TxToOut(tx types.Transaction) (result []txtool.Out, txHash c_type.Uint256) {
txCommonHash := tx.Hash()
copy(txHash[:], txCommonHash[:])
ztxs := tx.GetZZSTX()
if ztxs != nil {
tx1 := tx.GetZZSTX().Tx1
for index := range tx1.Outs_P {
out := txtool.Out{}
rootState := localdb.RootState{}
os := localdb.OutState{}
out_p := &tx1.Outs_P[index]
if c_superzk.IsSzkPKr(&tx1.Outs_P[index].PKr) {
os.Out_P = out_p
os.GenRootCM()
} else {
os.Out_O = &stx_v0.Out_O{
Addr: out_p.PKr,
Asset: out_p.Asset,
Memo: out_p.Memo,
}
os.GenRootCM()
}
rootState.OS = os
rootState.TxHash = txHash
out.State = rootState
result = append(result, out)

}

for index := range tx1.Outs_C {
out := txtool.Out{}
rootState := localdb.RootState{}
os := localdb.OutState{}
os.Out_C = &tx1.Outs_C[index]
os.GenRootCM()
rootState.OS = os
rootState.TxHash = txHash
out.State = rootState
result = append(result, out)
}
}
return
}

type PKrTxOuts map[c_type.PKr]map[c_type.Uint256]*TxOutInfo

func (p PKrTxOuts) AddPendingTxOut(tx types.Transaction) {

txOuts, _ := TxToOut(tx)
time := uint64(time.Now().Unix())
for index := range txOuts {
p.AddOut(tx.From(), tx.Gas(), tx.Gas(), tx.GasPrice(), 0, common.Hash{}, time, txOuts[index])
}

}

func (p PKrTxOuts) AddImmatureTxOut(tx types.Transaction, blockNumber uint64, blockHash common.Hash, time uint64) {

txOuts, _ := TxToOut(tx)
for index := range txOuts {
p.AddOut(tx.From(), tx.Gas(), tx.Gas(), tx.GasPrice(), blockNumber, blockHash, time, txOuts[index])
}

}

func (p PKrTxOuts) AddOut(
from common.Address,
gas uint64, gasUsed uint64,
gasPrice *big.Int,
blockNumber uint64,
blockHash common.Hash, time uint64,
out txtool.Out) {
pkr := out.State.OS.ToPKr()
txHash := out.State.TxHash
if pkr != nil {
if _, ok := p[*pkr]; ok {
if _, ok := p[*pkr][txHash]; ok {
p[*pkr][txHash].addOut(txHash, from, gas, gasUsed, gasPrice, blockNumber, blockHash, time, out)
} else {
txOutInfo := &TxOutInfo{OutExists: map[c_type.Uint256]bool{}}
txOutInfo.addOut(txHash, from, gas, gasUsed, gasPrice, blockNumber, blockHash, time, out)
p[*pkr][txHash] = txOutInfo
}

} else {
txOutInfo := &TxOutInfo{OutExists: map[c_type.Uint256]bool{}}
txOutInfo.addOut(txHash, from, gas, gasUsed, gasPrice, blockNumber, blockHash, time, out)
txOutMap := make(map[c_type.Uint256]*TxOutInfo)
txOutMap[txHash] = txOutInfo
p[*pkr] = txOutMap
}
}

}

func (p PKrTxOuts) delPendintTxOut(tx types.Transaction) {
txOuts, txHash := TxToOut(tx)
for index := range txOuts {
out := txOuts[index]
pkr := out.State.OS.ToPKr()
if pkr != nil {
delete(p[*pkr], txHash)
if len(p[*pkr]) == 0 {
delete(p, *pkr)
}

}
}
}

func (p PKrTxOuts) delPkrTxOut(pkr c_type.PKr, txHash c_type.Uint256) {

delete(p[pkr], txHash)
if len(p[pkr]) == 0 {
delete(p, pkr)
}
}

type TxOutInfo struct {
TxHash c_type.Uint256
BlockNumber uint64
BlockHash common.Hash
GasUsed uint64
Gas uint64
GasPrice *big.Int
From common.Address
Time uint64
Outs []txtool.Out
OutExists map[c_type.Uint256]bool
}

func (t *TxOutInfo) addOut(txHash c_type.Uint256, from common.Address, gas, gasUsed uint64, gasPrice *big.Int, blockNumber uint64, blockHash common.Hash, time uint64, out txtool.Out) {
t.TxHash = txHash
t.BlockNumber = blockNumber
t.BlockHash = blockHash
t.Gas = gas
t.GasUsed = gasUsed
t.GasPrice = gasPrice
t.From = from
t.Time = time

var existsKey c_type.Uint256

if out.State.OS.Out_O != nil {
existsKey = out.State.OS.Out_O.ToHash()
}

if out.State.OS.Out_P != nil {
existsKey = out.State.OS.Out_P.ToHash()
}

if out.State.OS.Out_C != nil {
existsKey = out.State.OS.Out_C.Tx1_Hash()
}

if _, ok := t.OutExists[existsKey]; !ok {
t.Outs = append(t.Outs, out)
t.OutExists[existsKey] = true

}

}

// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
Expand Down Expand Up @@ -175,6 +348,8 @@ type TxPool struct {

wg sync.WaitGroup // for shutdown sync

pkrTxOuts PKrTxOuts

homestead bool
}

Expand All @@ -195,6 +370,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.pkrTxOuts = make(map[c_type.PKr]map[c_type.Uint256]*TxOutInfo)
pool.locals = newAccountSet()
pool.priced = newTxPricedList(pool.all)
pool.newQueue = newTxPricedList(newTxLookup())
Expand Down Expand Up @@ -259,6 +435,9 @@ func (pool *TxPool) loop() {
}
for _, tx := range drop {
pool.removeTx(tx.Hash())
if pool.canAddPkrTx() {
pool.pkrTxOuts.delPendintTxOut(*tx)
}
}

dropFaileds := []common.Hash{}
Expand Down Expand Up @@ -293,6 +472,14 @@ func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) {
pool.reset(oldHead, newHead)
}

func (pool *TxPool) canAddPkrTx() bool {
difference := time.Now().Unix() - pool.chain.CurrentBlock().Time().Int64()
if difference > 10*60 {
return false
}
return pool.config.StartLight
}

// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *TxPool) reset(oldHead, newHead *types.Header) {
Expand Down Expand Up @@ -323,6 +510,11 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
}
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if pool.canAddPkrTx() {
for _, tx := range add.Transactions() {
pool.pkrTxOuts.AddImmatureTxOut(*tx, add.Number().Uint64(), add.Hash(), add.Time().Uint64())
}
}
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
Expand All @@ -335,6 +527,12 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
return
}
included = append(included, add.Transactions()...)
if pool.canAddPkrTx() {
for _, tx := range add.Transactions() {
pool.pkrTxOuts.AddImmatureTxOut(*tx, add.Number().Uint64(), add.Hash(), add.Time().Uint64())
}
}

if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
Expand All @@ -359,6 +557,11 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {

if len(included) == 0 {
add := pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
if pool.canAddPkrTx() {
for _, tx := range add.Transactions() {
pool.pkrTxOuts.AddImmatureTxOut(*tx, add.Number().Uint64(), add.Hash(), add.Time().Uint64())
}
}
included = append(included, add.Transactions()...)
}

Expand Down Expand Up @@ -446,6 +649,26 @@ func (pool *TxPool) Content() (types.Transactions, types.Transactions) {
return pending, queued
}

func (pool *TxPool) PendingOuts(pkr c_type.PKr) map[c_type.Uint256]*TxOutInfo {
pool.mu.Lock()
defer pool.mu.Unlock()
return pool.pkrTxOuts[pkr]
}

func (pool *TxPool) DelMaturedOuts(pkr c_type.PKr, txHash c_type.Uint256, currentNum uint64) {
pool.mu.Lock()
defer pool.mu.Unlock()
if txHashMap, ok := pool.pkrTxOuts[pkr]; ok {
for k, v := range txHashMap {
if v.BlockNumber != 0 && v.BlockNumber < currentNum {
pool.pkrTxOuts.delPkrTxOut(pkr, k)
}
}
}
pool.pkrTxOuts.delPkrTxOut(pkr, txHash)

}

// Pending retrieves all currently processable transactions, groupped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
Expand Down Expand Up @@ -626,6 +849,9 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
drop := pool.priced.Discard(pool.gasPrice, pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1))
for _, tx := range drop {
pool.removeTx(tx.Hash())
if pool.canAddPkrTx() {
pool.pkrTxOuts.delPendintTxOut(*tx)
}
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "priced", tx.GasPrice())
}
}
Expand All @@ -634,6 +860,9 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
if err != nil {
return false, err
}
if pool.canAddPkrTx() {
pool.pkrTxOuts.AddPendingTxOut(*tx)
}
log.Trace("Pooled new future transaction", "hash", hash, "from", tx.From(), "to", tx.To())
return flag, nil
}
Expand Down
8 changes: 4 additions & 4 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1262,10 +1262,10 @@ func (s *PublicBlockChainAPI) doCall(ctx context.Context, args CallArgs, blockNr

return nil, 0, false, err
}
if failed {
log.Info("call error", "msg", string(res))
}
log.Info("result", "data", hexutil.Encode(res))
//if failed {
// log.Info("call error", "msg", string(res))
//}
//log.Info("result", "data", hexutil.Encode(res))
return res, gas, failed, err

}
Expand Down
17 changes: 17 additions & 0 deletions internal/ethapi/api_light_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@ func (plna PublicLightNodeApi) GetOutsByPKr(ctx context.Context, addresses []*Mi
return plna.b.GetOutByPKr(pkrs, start, end)
}

func (plna PublicLightNodeApi) GetPendingOuts(ctx context.Context, addresses []*PKrAddress) (outBlockResp light.BlockOutResp, e error) {

pkrs := []c_type.PKr{}
for _, address := range addresses {
addr := *address
if len(addr) == 96 {
var pkr c_type.PKr
copy(pkr[:], addr[:])
pkrs = append(pkrs, pkr)
} else {
return outBlockResp, fmt.Errorf("address is invalid")
}
}
return light.Current_light.GetPendingOuts(pkrs)

}

func (plna PublicLightNodeApi) CheckNil(Nils []c_type.Uint256) (nilResps []light.NilValue, e error) {

return plna.b.CheckNil(Nils)
Expand Down
5 changes: 5 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,11 @@ web3._extend({
call: 'light_getOutsByPKr',
params: 3
}),
new web3._extend.Method({
name: 'getPendingOuts',
call: 'light_getPendingOuts',
params: 1
}),
new web3._extend.Method({
name: 'checkNil',
call: 'light_checkNil',
Expand Down
6 changes: 6 additions & 0 deletions sero/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"math/big"
"time"

"github.com/sero-cash/go-sero/zero/txtool/flight"

Expand Down Expand Up @@ -264,6 +265,11 @@ func (b *SeroAPIBackend) GetAnchor(roots []c_type.Uint256) ([]txtool.Witness, er

}
func (b *SeroAPIBackend) CommitTx(tx *txtool.GTx) error {

difference := time.Now().Unix() - b.CurrentBlock().Time().Int64()
if difference > 10*60 {
return errors.New("The current chain is too behind")
}
gasPrice := big.Int(tx.GasPrice)
gas := uint64(tx.Gas)
signedTx := types.NewTxWithGTx(gas, &gasPrice, &tx.Tx)
Expand Down
Loading

0 comments on commit a8b4c65

Please sign in to comment.