Skip to content

Commit

Permalink
add logic to stop writing depending on flags
Browse files Browse the repository at this point in the history
  • Loading branch information
minhd-vu committed Jun 23, 2023
1 parent 9934cae commit 9cce611
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 14 deletions.
16 changes: 16 additions & 0 deletions p2p/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,27 @@ import (
// to. To use another database solution, just implement these methods and
// update the sensor to use the new connection.
type Database interface {
// WriteBlock will write the both the block and block event to the database
// if ShouldWriteBlocks and ShouldWriteBlockEvents return true, respectively.
WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int)

// WriteBlockHeaders will write the block headers if ShouldWriteBlocks
// returns true.
WriteBlockHeaders(context.Context, []*types.Header)

// WriteBlockHashes will write the block hashes if ShouldWriteBlockEvents
// returns true.
WriteBlockHashes(context.Context, *enode.Node, []common.Hash)

// WriteBlockBodies will write the block bodies if ShouldWriteBlocks returns
// true.
WriteBlockBody(context.Context, *eth.BlockBody, common.Hash)

// WriteTransactions will write the both the transaction and transaction
// event to the database if ShouldWriteTransactions and
// ShouldWriteTransactionEvents return true, respectively.
WriteTransactions(context.Context, *enode.Node, []*types.Transaction)

HasParentBlock(context.Context, common.Hash) bool

MaxConcurrentWrites() int
Expand Down
40 changes: 31 additions & 9 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {

// WriteBlock writes the block and the block event to datastore.
func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int) {
d.writeEvent(peer, blockEventsKind, block.Hash(), blocksKind)
if d.ShouldWriteBlockEvents() {
d.writeEvent(peer, blockEventsKind, block.Hash(), blocksKind)
}

if !d.ShouldWriteBlocks() {
return
}

key := datastore.NameKey(blocksKind, block.Hash().Hex(), nil)
var dsBlock DatastoreBlock
Expand Down Expand Up @@ -180,6 +186,10 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ
// requested. The block events will be written when the hash is received
// instead.
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header) {
if !d.ShouldWriteBlocks() {
return
}

for _, header := range headers {
d.writeBlockHeader(ctx, header)
}
Expand All @@ -191,6 +201,10 @@ func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Head
// instead. It will write the uncles and transactions to datastore if they
// don't already exist.
func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash) {
if !d.ShouldWriteBlocks() {
return
}

key := datastore.NameKey(blocksKind, hash.Hex(), nil)
var block DatastoreBlock

Expand Down Expand Up @@ -224,13 +238,25 @@ func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, has

// WriteBlockHashes will write the block events to datastore.
func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash) {
d.writeEvents(ctx, peer, blockEventsKind, hashes, blocksKind)
if d.ShouldWriteBlockEvents() {
d.writeEvents(ctx, peer, blockEventsKind, hashes, blocksKind)
}
}

// WriteTransactions will write the transactions and transaction events to datastore.
func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction) {
hashes := d.writeTransactions(ctx, txs)
d.writeEvents(ctx, peer, transactionEventsKind, hashes, transactionsKind)
if d.ShouldWriteTransactions() {
d.writeTransactions(ctx, txs)
}

if d.ShouldWriteTransactionEvents() {
hashes := make([]common.Hash, 0, len(txs))
for _, tx := range txs {
hashes = append(hashes, tx.Hash())
}

d.writeEvents(ctx, peer, transactionEventsKind, hashes, transactionsKind)
}
}

func (d *Datastore) MaxConcurrentWrites() int {
Expand Down Expand Up @@ -376,20 +402,16 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header)

// writeTransactions will write the transactions to datastore and return the
// transaction hashes.
func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction) []common.Hash {
hashes := make([]common.Hash, 0, len(txs))
func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction) {
keys := make([]*datastore.Key, 0, len(txs))
transactions := make([]*DatastoreTransaction, 0, len(txs))

for _, tx := range txs {
hashes = append(hashes, tx.Hash())
keys = append(keys, datastore.NameKey(transactionsKind, tx.Hash().Hex(), nil))
transactions = append(transactions, newDatastoreTransaction(tx))
}

if _, err := d.client.PutMulti(ctx, keys, transactions); err != nil {
log.Error().Err(err).Msg("Failed to write transactions")
}

return hashes
}
10 changes: 5 additions & 5 deletions p2p/rlpx.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {
atomic.AddInt32(&count.BlockHeaders, int32(len(msg.BlockHeadersPacket)))
c.logger.Trace().Msgf("Received %v BlockHeaders", len(msg.BlockHeadersPacket))

if db != nil {
if db != nil && db.ShouldWriteBlocks() {
for _, header := range msg.BlockHeadersPacket {
if err := c.getParentBlock(ctx, db, header); err != nil {
return err
Expand Down Expand Up @@ -259,7 +259,7 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {
}
}

if db != nil && db.ShouldWriteBlocks() && len(hashes) > 0 {
if db != nil && db.ShouldWriteBlockEvents() && len(hashes) > 0 {
dbCh <- struct{}{}
go func() {
db.WriteBlockHashes(ctx, c.node, hashes)
Expand All @@ -270,7 +270,7 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {
atomic.AddInt32(&count.Blocks, 1)
c.logger.Trace().Str("hash", msg.Block.Hash().Hex()).Msg("Received NewBlock")

if db != nil && db.ShouldWriteBlocks() {
if db != nil && (db.ShouldWriteBlocks() || db.ShouldWriteBlockEvents()) {
if err := c.getParentBlock(ctx, db, msg.Block.Header()); err != nil {
return err
}
Expand All @@ -285,7 +285,7 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {
atomic.AddInt32(&count.Transactions, int32(len(*msg)))
c.logger.Trace().Msgf("Received %v Transactions", len(*msg))

if db != nil && db.ShouldWriteTransactions() {
if db != nil && (db.ShouldWriteTransactions() || db.ShouldWriteTransactionEvents()) {
dbCh <- struct{}{}
go func() {
db.WriteTransactions(ctx, c.node, *msg)
Expand All @@ -296,7 +296,7 @@ func (c *Conn) ReadAndServe(db database.Database, count *MessageCount) error {
atomic.AddInt32(&count.Transactions, int32(len(msg.PooledTransactionsPacket)))
c.logger.Trace().Msgf("Received %v PooledTransactions", len(msg.PooledTransactionsPacket))

if db != nil && db.ShouldWriteTransactions() {
if db != nil && (db.ShouldWriteTransactions() || db.ShouldWriteTransactionEvents()) {
dbCh <- struct{}{}
go func() {
db.WriteTransactions(ctx, c.node, msg.PooledTransactionsPacket)
Expand Down

0 comments on commit 9cce611

Please sign in to comment.