Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #56 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Block Processor Queue to pick next block ( failed before ) efficiently
  • Loading branch information
itzmeanjan authored Apr 4, 2021
2 parents 1a87cac + e7e9257 commit 71754fd
Show file tree
Hide file tree
Showing 23 changed files with 1,343 additions and 915 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
SHELL:=/bin/bash

proto_clean:
rm -rfv app/pb

Expand All @@ -6,9 +8,7 @@ proto_gen:
protoc -I app/proto/ --go_out=paths=source_relative:app/pb app/proto/*.proto

graphql_gen:
pushd app/rest
gqlgen generate
popd
pushd app/rest; gqlgen generate; popd

build:
go build -o ette
Expand Down
99 changes: 28 additions & 71 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,110 +5,62 @@ import (
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/go-redis/redis/v8"
"github.com/gookit/color"
blk "github.com/itzmeanjan/ette/app/block"
cfg "github.com/itzmeanjan/ette/app/config"
d "github.com/itzmeanjan/ette/app/data"
"github.com/itzmeanjan/ette/app/db"

"github.com/itzmeanjan/ette/app/rest"
"github.com/itzmeanjan/ette/app/rest/graph"
srv "github.com/itzmeanjan/ette/app/services"
ss "github.com/itzmeanjan/ette/app/snapshot"
"gorm.io/gorm"
)

// Setting ground up
func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConnection, *redis.Client, *gorm.DB, *d.StatusHolder) {
err := cfg.Read(configFile)
if err != nil {
log.Fatalf("[!] Failed to read `.env` : %s\n", err.Error())
}

if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "2" || cfg.Get("EtteMode") == "3" || cfg.Get("EtteMode") == "4" || cfg.Get("EtteMode") == "5") {
log.Fatalf("[!] Failed to find `EtteMode` in configuration file\n")
}

// Maintaining both HTTP & Websocket based connection to blockchain
_connection := &d.BlockChainNodeConnection{
RPC: getClient(true),
Websocket: getClient(false),
}

_redisClient := getRedisClient()

if _redisClient == nil {
log.Fatalf("[!] Failed to connect to Redis Server\n")
}

if err := _redisClient.FlushAll(context.Background()).Err(); err != nil {
log.Printf("[!] Failed to flush all keys from redis : %s\n", err.Error())
}

_db := db.Connect()

// Populating subscription plans from `.plans.json` into
// database table, at application start up
db.PersistAllSubscriptionPlans(_db, subscriptionPlansFile)

// Passing db handle, to graph package, so that it can be used
// for resolving graphQL queries
graph.GetDatabaseConnection(_db)

_status := &d.StatusHolder{
State: &d.SyncState{
BlockCountAtStartUp: db.GetBlockCount(_db),
MaxBlockNumberAtStartUp: db.GetCurrentBlockNumber(_db),
},
Mutex: &sync.RWMutex{},
}

return _connection, _redisClient, _db, _status
}

// Run - Application to be invoked from main runner using this function
func Run(configFile, subscriptionPlansFile string) {
_connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile)
_redisInfo := d.RedisInfo{
Client: _redisClient,
BlockRetryQueue: "blocks_in_retry_queue",
BlockRetryCountTable: "attempt_count_tracker_table",
UnfinalizedBlocksQueue: "unfinalized_blocks",
}

ctx, cancel := context.WithCancel(context.Background())
_connection, _redisClient, _redisInfo, _db, _status, _queue := bootstrap(configFile, subscriptionPlansFile)

// Attempting to listen to Ctrl+C signal
// and when received gracefully shutting down `ette`
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
signal.Notify(interruptChan, syscall.SIGTERM, syscall.SIGINT)

// All resources being used gets cleaned up
// when we're returning from this function scope
go func() {

<-interruptChan

// This call should be received in all places
// where root context is passed along
//
// But only it's being used in block processor queue
// go routine, as of now
//
// @note This can ( needs to ) be improved
cancel()

sql, err := _db.DB()
if err != nil {
log.Printf(color.Red.Sprintf("[!] Failed to get underlying DB connection : %s", err.Error()))
log.Print(color.Red.Sprintf("[!] Failed to get underlying DB connection : %s", err.Error()))
return
}

if err := sql.Close(); err != nil {
log.Printf(color.Red.Sprintf("[!] Failed to close underlying DB connection : %s", err.Error()))
log.Print(color.Red.Sprintf("[!] Failed to close underlying DB connection : %s", err.Error()))
return
}

if err := _redisInfo.Client.Close(); err != nil {
log.Printf(color.Red.Sprintf("[!] Failed to close connection to Redis : %s", err.Error()))
log.Print(color.Red.Sprintf("[!] Failed to close connection to Redis : %s", err.Error()))
return
}

// Stopping process
log.Printf(color.Magenta.Sprintf("\n[+] Gracefully shut down `ette`"))
log.Print(color.Magenta.Sprintf("\n[+] Gracefully shut down `ette`"))
os.Exit(0)

}()
Expand All @@ -131,9 +83,9 @@ func Run(configFile, subscriptionPlansFile string) {
// taking snapshot, this might take some time
_ret := ss.TakeSnapshot(_db, _snapshotFile, db.GetCurrentOldestBlockNumber(_db), db.GetCurrentBlockNumber(_db), _status.BlockCountInDB())
if _ret {
log.Printf(color.Green.Sprintf("[+] Snapshotted in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _status.BlockCountInDB()))
log.Print(color.Green.Sprintf("[+] Snapshotted in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _status.BlockCountInDB()))
} else {
log.Printf(color.Red.Sprintf("[!] Snapshotting failed in : %s", time.Now().UTC().Sub(_start)))
log.Print(color.Red.Sprintf("[!] Snapshotting failed in : %s", time.Now().UTC().Sub(_start)))
}

return
Expand All @@ -152,19 +104,24 @@ func Run(configFile, subscriptionPlansFile string) {

_, _count := ss.RestoreFromSnapshot(_db, _snapshotFile)

log.Printf(color.Green.Sprintf("[+] Restored from snapshot in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _count))
log.Print(color.Green.Sprintf("[+] Restored from snapshot in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _count))

return

}

go _queue.Start(ctx)

// Pushing block header propagation listener to another thread of execution
go blk.SubscribeToNewBlocks(_connection, _db, _status, &_redisInfo)
go blk.SubscribeToNewBlocks(_connection, _db, _status, _redisInfo, _queue)

// Periodic clean up job being started, to be run every 24 hours to clean up
// delivery history data, older than 24 hours
go srv.DeliveryHistoryCleanUpService(_db)
//
// @note Need to be diagnosed, why it doesn't work
// go srv.DeliveryHistoryCleanUpService(_db)

// Starting http server on main thread
rest.RunHTTPServer(_db, _status, _redisClient)

}
96 changes: 39 additions & 57 deletions app/block/block.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,55 @@
package block

import (
"fmt"
"log"
"runtime"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/gammazero/workerpool"
"github.com/gookit/color"
cfg "github.com/itzmeanjan/ette/app/config"
d "github.com/itzmeanjan/ette/app/data"
"github.com/itzmeanjan/ette/app/db"
q "github.com/itzmeanjan/ette/app/queue"
"gorm.io/gorm"
)

// HasBlockFinalized - Checking whether block under processing i.e. `number`
// has `N` confirmations on top of it or not
func HasBlockFinalized(status *d.StatusHolder, number uint64) bool {

return status.GetLatestBlockNumber()-cfg.GetBlockConfirmations() >= number

}

// ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) bool {
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, queue *q.BlockProcessorQueue, status *d.StatusHolder, startingAt time.Time) bool {

// Closure managing publishing whole block data i.e. block header, txn(s), event logs
// on redis pubsub channel
pubsubWorker := func(txns []*db.PackedTransaction) *db.PackedBlock {
pubsubWorker := func(txns []*db.PackedTransaction) (*db.PackedBlock, bool) {

// Constructing block data to published & persisted
packedBlock := BuildPackedBlock(block, txns)

// -- 3 step pub/sub attempt
//
// Attempting to publish whole block data to redis pubsub channel
// when eligible `EtteMode` is set
if publishable && (cfg.Get("EtteMode") == "2" || cfg.Get("EtteMode") == "3") {
PublishBlock(packedBlock, redis)

// 1. Asking queue whether we need to publish block or not
if !queue.CanPublish(block.NumberU64()) {
return packedBlock, true
}

// 2. Attempting to publish block on Pub/Sub topic
if !PublishBlock(packedBlock, redis) {
return nil, false
}

// 3. Marking this block as published
if !queue.Published(block.NumberU64()) {
return nil, false
}

}
// -- done, with publishing on Pub/Sub topic

return packedBlock
return packedBlock, true

}

Expand All @@ -49,45 +58,34 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
// Constructing block data to be persisted
//
// This is what we just published on pubsub channel
packedBlock := pubsubWorker(nil)
packedBlock, ok := pubsubWorker(nil)
if !ok {
return false
}

// If `ette` being run in mode, for only publishing data to
// pubsub channel, no need to persist data
//
// We simply publish & return from execution scope
if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") {

log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt)))
log.Printf("✅ Block %d with 0 tx(s) [ Took : %s ]\n", block.NumberU64(), time.Now().UTC().Sub(startingAt))
status.IncrementBlocksProcessed()

return true

}

// If block doesn't contain any tx, we'll attempt to persist only block
if err := db.StoreBlock(_db, packedBlock, status); err != nil {

log.Print(color.Red.Sprintf("[+] Failed to process block %d with 0 tx(s) : %s [ Took : %s ]", block.NumberU64(), err.Error(), time.Now().UTC().Sub(startingAt)))
if err := db.StoreBlock(_db, packedBlock, status, queue); err != nil {

// If failed to persist, we'll put it in retry queue
PushBlockIntoRetryQueue(redis, block.Number().String())
log.Printf("❗️ Failed to process block %d : %s\n", block.NumberU64(), err.Error())
return false

}

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with 0 tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return true

}

// Successfully processed block
log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt)))
log.Printf("✅ Block %d with 0 tx(s) [ Took : %s ]\n", block.NumberU64(), time.Now().UTC().Sub(startingAt))
status.IncrementBlocksProcessed()

return true
Expand Down Expand Up @@ -160,57 +158,41 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
wp.Stop()
// -- Tx processing ending

// When all tx(s) aren't successfully processed ( as they have informed us over go channel ),
// we're exiting from this context, while putting this block number in retry queue
if !(result.Failure == 0) {

PushBlockIntoRetryQueue(redis, block.Number().String())
return false

}

// Constructing block data to be persisted
//
// This is what we just published on pubsub channel
packedBlock := pubsubWorker(packedTxs)
packedBlock, ok := pubsubWorker(packedTxs)
if !ok {
return false
}

// If `ette` being run in mode, for only publishing data to
// pubsub channel, no need to persist data
//
// We simply publish & return from execution scope
if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") {

log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)))
log.Printf("✅ Block %d with %d tx(s) [ Took : %s ]\n", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt))
status.IncrementBlocksProcessed()

return true

}

// If block doesn't contain any tx, we'll attempt to persist only block
if err := db.StoreBlock(_db, packedBlock, status); err != nil {
if err := db.StoreBlock(_db, packedBlock, status, queue); err != nil {

log.Print(color.Red.Sprintf("[+] Failed to process block %d with %d tx(s) : %s [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), err.Error(), time.Now().UTC().Sub(startingAt)))

// If failed to persist, we'll put it in retry queue
PushBlockIntoRetryQueue(redis, block.Number().String())
log.Printf("❗️ Failed to process block %d : %s\n", block.NumberU64(), err.Error())
return false

}

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with %d tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, block.Transactions().Len(), time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return true

}

// Successfully processed block
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)))
log.Printf("✅ Block %d with %d tx(s) [ Took : %s ]\n", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt))

status.IncrementBlocksProcessed()
return true
Expand Down
Loading

0 comments on commit 71754fd

Please sign in to comment.