Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #45 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Controlled Concurrency
  • Loading branch information
itzmeanjan authored Jan 8, 2021
2 parents cdbfbf0 + 7f9f769 commit 8ee7f15
Show file tree
Hide file tree
Showing 27 changed files with 932 additions and 778 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ cd ette
```

- For testing historical data query using browser based GraphQL Playground in `ette`, you can set `EtteGraphQLPlayGround` to `yes` in config file
- For processing block(s)/ tx(s) concurrently, it'll create `ConcurrencyFactor * #-of CPUs on machine` workers, who will pick up jobs submitted to them.
- If nothing is specified, it defaults to 1 & assuming you're running `ette` on machine with 4 CPUs, it'll spawn worker pool of size 4. But more number of jobs can be submitted, only 4 can be running at max.
- 👆 being done for controlling concurrency level, by putting more control on user's hand.

```
RPCUrl=https://<domain-name>
Expand All @@ -113,6 +116,7 @@ Domain=localhost
Production=yes
EtteMode=3
EtteGraphQLPlayGround=yes
ConcurrencyFactor=2
```
- Create another file in same directory, named `.plans.json`, whose content will look like 👇.
Expand Down
56 changes: 49 additions & 7 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package app
import (
"context"
"log"
"os"
"os/signal"
"sync"

"github.com/go-redis/redis/v8"
"github.com/gookit/color"
blk "github.com/itzmeanjan/ette/app/block"
cfg "github.com/itzmeanjan/ette/app/config"
d "github.com/itzmeanjan/ette/app/data"
Expand All @@ -16,7 +19,7 @@ import (
)

// Setting ground up
func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConnection, *redis.Client, *gorm.DB, *sync.Mutex, *d.SyncState) {
func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConnection, *redis.Client, *gorm.DB, *d.StatusHolder) {
err := cfg.Read(configFile)
if err != nil {
log.Fatalf("[!] Failed to read `.env` : %s\n", err.Error())
Expand Down Expand Up @@ -48,19 +51,58 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne
// for resolving graphQL queries
graph.GetDatabaseConnection(_db)

_lock := &sync.Mutex{}
_synced := &d.SyncState{Done: 0, BlockCountAtStartUp: db.GetBlockCount(_db), NewBlocksInserted: 0}
_status := &d.StatusHolder{
State: &d.SyncState{BlockCountAtStartUp: db.GetBlockCount(_db)},
Mutex: &sync.RWMutex{},
}

return _connection, _redisClient, _db, _lock, _synced
return _connection, _redisClient, _db, _status
}

// Run - Application to be invoked from main runner using this function
func Run(configFile, subscriptionPlansFile string) {
_connection, _redisClient, _db, _lock, _synced := bootstrap(configFile, subscriptionPlansFile)
_connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile)
_redisInfo := d.RedisInfo{
Client: _redisClient,
QueueName: "blocks",
}

// Attempting to listen to Ctrl+C signal
// and when received gracefully shutting down `ette`
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt)

// All resources being used gets cleaned up
// when we're returning from this function scope
go func() {

<-interruptChan

sql, err := _db.DB()
if err != nil {
log.Printf(color.Red.Sprintf("[!] Failed to get underlying DB connection : %s", err.Error()))
return
}

if err := sql.Close(); err != nil {
log.Printf(color.Red.Sprintf("[!] Failed to close underlying DB connection : %s", err.Error()))
return
}

if err := _redisInfo.Client.Close(); err != nil {
log.Printf(color.Red.Sprintf("[!] Failed to close connection to Redis : %s", err.Error()))
return
}

// Stopping process
log.Printf(color.Magenta.Sprintf("\n[+] Gracefully shut down `ette`"))
os.Exit(0)

}()

// Pushing block header propagation listener to another thread of execution
go blk.SubscribeToNewBlocks(_connection, _db, _lock, _synced, _redisClient, "blocks")
go blk.SubscribeToNewBlocks(_connection, _db, _status, &_redisInfo)

// Starting http server on main thread
rest.RunHTTPServer(_db, _lock, _synced, _redisClient)
rest.RunHTTPServer(_db, _status, _redisClient)
}
194 changes: 90 additions & 104 deletions app/block/block.go
Original file line number Diff line number Diff line change
@@ -1,168 +1,154 @@
package block

import (
"context"
"fmt"
"log"
"math/big"
"sync"
"runtime"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/go-redis/redis/v8"
"github.com/gammazero/workerpool"
"github.com/gookit/color"
cfg "github.com/itzmeanjan/ette/app/config"
d "github.com/itzmeanjan/ette/app/data"
"github.com/itzmeanjan/ette/app/db"
"gorm.io/gorm"
)

// Fetching block content using blockHash
func fetchBlockByHash(client *ethclient.Client, hash common.Hash, number string, _db *gorm.DB, redisClient *redis.Client, redisKey string, _lock *sync.Mutex, _synced *d.SyncState) {
block, err := client.BlockByHash(context.Background(), hash)
if err != nil {
// Pushing block number into Redis queue for retrying later
pushBlockHashIntoRedisQueue(redisClient, redisKey, number)
// ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder) {

log.Print(color.Red.Sprintf("[!] Failed to fetch block by hash [ block : %s] : %s", number, err.Error()))
return
}
// Closure managing publishing whole block data i.e. block header, txn(s), event logs
// on redis pubsub channel
pubsubWorker := func(txns []*db.PackedTransaction) *db.PackedBlock {

// Publishes block data to all listening parties
// on `block` channel
publishBlock := func() {
if err := redisClient.Publish(context.Background(), "block", &d.Block{
Hash: block.Hash().Hex(),
Number: block.NumberU64(),
Time: block.Time(),
ParentHash: block.ParentHash().Hex(),
Difficulty: block.Difficulty().String(),
GasUsed: block.GasUsed(),
GasLimit: block.GasLimit(),
Nonce: block.Nonce(),
Miner: block.Coinbase().Hex(),
Size: float64(block.Size()),
TransactionRootHash: block.TxHash().Hex(),
ReceiptRootHash: block.ReceiptHash().Hex(),
}).Err(); err != nil {
log.Print(color.Red.Sprintf("[!] Failed to publish block %d in channel : %s", block.NumberU64(), err.Error()))
}
}
// Constructing block data to published & persisted
packedBlock := BuildPackedBlock(block, txns)

// Controlling behaviour of ette depending upon value of `EtteMode`
switch cfg.Get("EtteMode") {
case "1":
if !db.StoreBlock(_db, block, _lock, _synced) {
// Pushing block number into Redis queue for retrying later
// because it failed to store block in database
pushBlockHashIntoRedisQueue(redisClient, redisKey, number)
return
// Attempting to publish whole block data to redis pubsub channel
if publishable && (cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") {
PublishBlock(packedBlock, redis)
}
case "2":
publishBlock()
case "3":
// Try completing task of publishing block data, first
// then we'll attempt to store it, is that fails, we'll push it to retry queue
publishBlock()

if !db.StoreBlock(_db, block, _lock, _synced) {
// Pushing block number into Redis queue for retrying later
// because it failed to store block in database
pushBlockHashIntoRedisQueue(redisClient, redisKey, number)
return
}
}

fetchBlockContent(client, block, _db, redisClient, redisKey, true, _lock, _synced)
}
return packedBlock

// Fetching block content using block number
func fetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, redisClient *redis.Client, redisKey string, _lock *sync.Mutex, _synced *d.SyncState) {
_num := big.NewInt(0)
_num = _num.SetUint64(number)
}

block, err := client.BlockByNumber(context.Background(), _num)
if err != nil {
// Pushing block number into Redis queue for retrying later
pushBlockHashIntoRedisQueue(redisClient, redisKey, fmt.Sprintf("%d", number))
if block.Transactions().Len() == 0 {

log.Print(color.Red.Sprintf("[!] Failed to fetch block by number [ block : %d ] : %s", number, err))
return
}
// Constructing block data to be persisted
//
// This is what we just published on pubsub channel
packedBlock := pubsubWorker(nil)

// Either creates new entry or updates existing one
if !db.StoreBlock(_db, block, _lock, _synced) {
// Pushing block number into Redis queue for retrying later
pushBlockHashIntoRedisQueue(redisClient, redisKey, fmt.Sprintf("%d", number))
return
}
// If block doesn't contain any tx, we'll attempt to persist only block
if err := db.StoreBlock(_db, packedBlock, status); err != nil {

fetchBlockContent(client, block, _db, redisClient, redisKey, false, _lock, _synced)
}
// If failed to persist, we'll put it in retry queue
pushBlockHashIntoRedisQueue(redis, block.Number().String())
return

// Fetching all transactions in this block, along with their receipt
func fetchBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redisClient *redis.Client, redisKey string, publishable bool, _lock *sync.Mutex, _synced *d.SyncState) {
if block.Transactions().Len() == 0 {
}

// Successfully processed block
log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s)", block.NumberU64()))
status.IncrementBlocksProcessed()

safeUpdationOfSyncState(_lock, _synced)
return

}

// Communication channel to be shared between multiple executing go routines
// which are trying to fetch all tx(s) present in block, concurrently
returnValChan := make(chan bool)
returnValChan := make(chan *db.PackedTransaction, runtime.NumCPU()*int(cfg.GetConcurrencyFactor()))

// -- Tx processing starting
// Creating job processor queue
// which will process all tx(s), concurrently
wp := workerpool.New(runtime.NumCPU() * int(cfg.GetConcurrencyFactor()))

// Concurrently trying to process all tx(s) for this block, in hope of better performance
for _, v := range block.Transactions() {

// Concurrently trying to fetch multiple tx(s) present in block
// and expecting their status result to be published on shared channel
// and expecting their return value to be published on shared channel
//
// Which is being read 👇
go func(tx *types.Transaction) {
fetchTransactionByHash(client, block, tx, _db, redisClient, redisKey, publishable,
_lock, _synced, returnValChan)
func(tx *types.Transaction) {
wp.Submit(func() {

FetchTransactionByHash(client,
block,
tx,
_db,
redis,
publishable,
status,
returnValChan)

})
}(v)

}

// Keeping track of how many of these tx fetchers succeded & how many of them failed
result := d.ResultStatus{}
// Data received from tx fetchers, to be stored here
packedTxs := make([]*db.PackedTransaction, block.Transactions().Len())

for v := range returnValChan {
if v {
if v != nil {
result.Success++
} else {
result.Failure++
}

// #-of tx fetchers completed their job till now
//
// Either successfully or failed some how
total := int(result.Total())
// Storing tx data received from just completed go routine
packedTxs[total-1] = v

// All go routines have completed their job
if result.Total() == uint64(block.Transactions().Len()) {
if total == block.Transactions().Len() {
break
}
}

// When all tx(s) are successfully processed ( as they have informed us over go channel ),
// we're happy to exit from this context, given that none of them failed
if result.Failure == 0 {
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s)", block.NumberU64(), len(block.Transactions())))
// Stopping job processor forcefully
// because by this time all jobs have been completed
//
// Otherwise control flow will not be able to come here
// it'll keep looping in 👆 loop, reading from channel
wp.Stop()
// -- Tx processing ending

safeUpdationOfSyncState(_lock, _synced)
// When all tx(s) aren't successfully processed ( as they have informed us over go channel ),
// we're exiting from this context, while putting this block number in retry queue
if !(result.Failure == 0) {

// If failed to persist, we'll put it in retry queue
pushBlockHashIntoRedisQueue(redis, block.Number().String())
return

}

// Pushing block number into Redis queue for retrying later
// because it failed to complete some of its jobs 👆
pushBlockHashIntoRedisQueue(redisClient, redisKey, block.Number().String())
}
// Constructing block data to be persisted
//
// This is what we just published on pubsub channel
packedBlock := pubsubWorker(packedTxs)

// If block doesn't contain any tx, we'll attempt to persist only block
if err := db.StoreBlock(_db, packedBlock, status); err != nil {

// If failed to persist, we'll put it in retry queue
pushBlockHashIntoRedisQueue(redis, block.Number().String())
return

}

// Updating shared varible between worker go routines, denoting progress of
// `ette`, in terms of data syncing
func safeUpdationOfSyncState(_lock *sync.Mutex, _synced *d.SyncState) {
_lock.Lock()
defer _lock.Unlock()
// Successfully processed block
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s)", block.NumberU64(), block.Transactions().Len()))
status.IncrementBlocksProcessed()

_synced.Done++
}
Loading

0 comments on commit 8ee7f15

Please sign in to comment.