diff --git a/Makefile b/Makefile index d33aeb41..5c727fbd 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ +SHELL:=/bin/bash + proto_clean: rm -rfv app/pb @@ -6,9 +8,7 @@ proto_gen: protoc -I app/proto/ --go_out=paths=source_relative:app/pb app/proto/*.proto graphql_gen: - pushd app/rest - gqlgen generate - popd + pushd app/rest; gqlgen generate; popd build: go build -o ette diff --git a/app/app.go b/app/app.go index d852910e..25834f35 100644 --- a/app/app.go +++ b/app/app.go @@ -5,85 +5,28 @@ import ( "log" "os" "os/signal" - "sync" "syscall" "time" - "github.com/go-redis/redis/v8" "github.com/gookit/color" blk "github.com/itzmeanjan/ette/app/block" cfg "github.com/itzmeanjan/ette/app/config" - d "github.com/itzmeanjan/ette/app/data" "github.com/itzmeanjan/ette/app/db" + "github.com/itzmeanjan/ette/app/rest" - "github.com/itzmeanjan/ette/app/rest/graph" - srv "github.com/itzmeanjan/ette/app/services" ss "github.com/itzmeanjan/ette/app/snapshot" - "gorm.io/gorm" ) -// Setting ground up -func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConnection, *redis.Client, *gorm.DB, *d.StatusHolder) { - err := cfg.Read(configFile) - if err != nil { - log.Fatalf("[!] Failed to read `.env` : %s\n", err.Error()) - } - - if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "2" || cfg.Get("EtteMode") == "3" || cfg.Get("EtteMode") == "4" || cfg.Get("EtteMode") == "5") { - log.Fatalf("[!] Failed to find `EtteMode` in configuration file\n") - } - - // Maintaining both HTTP & Websocket based connection to blockchain - _connection := &d.BlockChainNodeConnection{ - RPC: getClient(true), - Websocket: getClient(false), - } - - _redisClient := getRedisClient() - - if _redisClient == nil { - log.Fatalf("[!] Failed to connect to Redis Server\n") - } - - if err := _redisClient.FlushAll(context.Background()).Err(); err != nil { - log.Printf("[!] Failed to flush all keys from redis : %s\n", err.Error()) - } - - _db := db.Connect() - - // Populating subscription plans from `.plans.json` into - // database table, at application start up - db.PersistAllSubscriptionPlans(_db, subscriptionPlansFile) - - // Passing db handle, to graph package, so that it can be used - // for resolving graphQL queries - graph.GetDatabaseConnection(_db) - - _status := &d.StatusHolder{ - State: &d.SyncState{ - BlockCountAtStartUp: db.GetBlockCount(_db), - MaxBlockNumberAtStartUp: db.GetCurrentBlockNumber(_db), - }, - Mutex: &sync.RWMutex{}, - } - - return _connection, _redisClient, _db, _status -} - // Run - Application to be invoked from main runner using this function func Run(configFile, subscriptionPlansFile string) { - _connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile) - _redisInfo := d.RedisInfo{ - Client: _redisClient, - BlockRetryQueue: "blocks_in_retry_queue", - BlockRetryCountTable: "attempt_count_tracker_table", - UnfinalizedBlocksQueue: "unfinalized_blocks", - } + + ctx, cancel := context.WithCancel(context.Background()) + _connection, _redisClient, _redisInfo, _db, _status, _queue := bootstrap(configFile, subscriptionPlansFile) // Attempting to listen to Ctrl+C signal // and when received gracefully shutting down `ette` interruptChan := make(chan os.Signal, 1) - signal.Notify(interruptChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + signal.Notify(interruptChan, syscall.SIGTERM, syscall.SIGINT) // All resources being used gets cleaned up // when we're returning from this function scope @@ -91,24 +34,33 @@ func Run(configFile, subscriptionPlansFile string) { <-interruptChan + // This call should be received in all places + // where root context is passed along + // + // But only it's being used in block processor queue + // go routine, as of now + // + // @note This can ( needs to ) be improved + cancel() + sql, err := _db.DB() if err != nil { - log.Printf(color.Red.Sprintf("[!] Failed to get underlying DB connection : %s", err.Error())) + log.Print(color.Red.Sprintf("[!] Failed to get underlying DB connection : %s", err.Error())) return } if err := sql.Close(); err != nil { - log.Printf(color.Red.Sprintf("[!] Failed to close underlying DB connection : %s", err.Error())) + log.Print(color.Red.Sprintf("[!] Failed to close underlying DB connection : %s", err.Error())) return } if err := _redisInfo.Client.Close(); err != nil { - log.Printf(color.Red.Sprintf("[!] Failed to close connection to Redis : %s", err.Error())) + log.Print(color.Red.Sprintf("[!] Failed to close connection to Redis : %s", err.Error())) return } // Stopping process - log.Printf(color.Magenta.Sprintf("\n[+] Gracefully shut down `ette`")) + log.Print(color.Magenta.Sprintf("\n[+] Gracefully shut down `ette`")) os.Exit(0) }() @@ -131,9 +83,9 @@ func Run(configFile, subscriptionPlansFile string) { // taking snapshot, this might take some time _ret := ss.TakeSnapshot(_db, _snapshotFile, db.GetCurrentOldestBlockNumber(_db), db.GetCurrentBlockNumber(_db), _status.BlockCountInDB()) if _ret { - log.Printf(color.Green.Sprintf("[+] Snapshotted in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _status.BlockCountInDB())) + log.Print(color.Green.Sprintf("[+] Snapshotted in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _status.BlockCountInDB())) } else { - log.Printf(color.Red.Sprintf("[!] Snapshotting failed in : %s", time.Now().UTC().Sub(_start))) + log.Print(color.Red.Sprintf("[!] Snapshotting failed in : %s", time.Now().UTC().Sub(_start))) } return @@ -152,19 +104,24 @@ func Run(configFile, subscriptionPlansFile string) { _, _count := ss.RestoreFromSnapshot(_db, _snapshotFile) - log.Printf(color.Green.Sprintf("[+] Restored from snapshot in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _count)) + log.Print(color.Green.Sprintf("[+] Restored from snapshot in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _count)) return } + go _queue.Start(ctx) + // Pushing block header propagation listener to another thread of execution - go blk.SubscribeToNewBlocks(_connection, _db, _status, &_redisInfo) + go blk.SubscribeToNewBlocks(_connection, _db, _status, _redisInfo, _queue) // Periodic clean up job being started, to be run every 24 hours to clean up // delivery history data, older than 24 hours - go srv.DeliveryHistoryCleanUpService(_db) + // + // @note Need to be diagnosed, why it doesn't work + // go srv.DeliveryHistoryCleanUpService(_db) // Starting http server on main thread rest.RunHTTPServer(_db, _status, _redisClient) + } diff --git a/app/block/block.go b/app/block/block.go index 4465431d..42af8c90 100644 --- a/app/block/block.go +++ b/app/block/block.go @@ -1,7 +1,6 @@ package block import ( - "fmt" "log" "runtime" "time" @@ -9,38 +8,48 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/gammazero/workerpool" - "github.com/gookit/color" cfg "github.com/itzmeanjan/ette/app/config" d "github.com/itzmeanjan/ette/app/data" "github.com/itzmeanjan/ette/app/db" + q "github.com/itzmeanjan/ette/app/queue" "gorm.io/gorm" ) -// HasBlockFinalized - Checking whether block under processing i.e. `number` -// has `N` confirmations on top of it or not -func HasBlockFinalized(status *d.StatusHolder, number uint64) bool { - - return status.GetLatestBlockNumber()-cfg.GetBlockConfirmations() >= number - -} - // ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data -func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) bool { +func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, queue *q.BlockProcessorQueue, status *d.StatusHolder, startingAt time.Time) bool { // Closure managing publishing whole block data i.e. block header, txn(s), event logs // on redis pubsub channel - pubsubWorker := func(txns []*db.PackedTransaction) *db.PackedBlock { + pubsubWorker := func(txns []*db.PackedTransaction) (*db.PackedBlock, bool) { // Constructing block data to published & persisted packedBlock := BuildPackedBlock(block, txns) + // -- 3 step pub/sub attempt + // // Attempting to publish whole block data to redis pubsub channel // when eligible `EtteMode` is set if publishable && (cfg.Get("EtteMode") == "2" || cfg.Get("EtteMode") == "3") { - PublishBlock(packedBlock, redis) + + // 1. Asking queue whether we need to publish block or not + if !queue.CanPublish(block.NumberU64()) { + return packedBlock, true + } + + // 2. Attempting to publish block on Pub/Sub topic + if !PublishBlock(packedBlock, redis) { + return nil, false + } + + // 3. Marking this block as published + if !queue.Published(block.NumberU64()) { + return nil, false + } + } + // -- done, with publishing on Pub/Sub topic - return packedBlock + return packedBlock, true } @@ -49,7 +58,10 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // Constructing block data to be persisted // // This is what we just published on pubsub channel - packedBlock := pubsubWorker(nil) + packedBlock, ok := pubsubWorker(nil) + if !ok { + return false + } // If `ette` being run in mode, for only publishing data to // pubsub channel, no need to persist data @@ -57,7 +69,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // We simply publish & return from execution scope if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") { - log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt))) + log.Printf("✅ Block %d with 0 tx(s) [ Took : %s ]\n", block.NumberU64(), time.Now().UTC().Sub(startingAt)) status.IncrementBlocksProcessed() return true @@ -65,29 +77,15 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm } // If block doesn't contain any tx, we'll attempt to persist only block - if err := db.StoreBlock(_db, packedBlock, status); err != nil { - - log.Print(color.Red.Sprintf("[+] Failed to process block %d with 0 tx(s) : %s [ Took : %s ]", block.NumberU64(), err.Error(), time.Now().UTC().Sub(startingAt))) + if err := db.StoreBlock(_db, packedBlock, status, queue); err != nil { - // If failed to persist, we'll put it in retry queue - PushBlockIntoRetryQueue(redis, block.Number().String()) + log.Printf("❗️ Failed to process block %d : %s\n", block.NumberU64(), err.Error()) return false } - if !HasBlockFinalized(status, packedBlock.Block.Number) { - - log.Print(color.LightRed.Sprintf("[x] Non-final block %d with 0 tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis))) - - // Pushing into unfinalized block queue, to be picked up only when - // finality for this block has been achieved - PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number)) - return true - - } - // Successfully processed block - log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt))) + log.Printf("✅ Block %d with 0 tx(s) [ Took : %s ]\n", block.NumberU64(), time.Now().UTC().Sub(startingAt)) status.IncrementBlocksProcessed() return true @@ -160,19 +158,17 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm wp.Stop() // -- Tx processing ending - // When all tx(s) aren't successfully processed ( as they have informed us over go channel ), - // we're exiting from this context, while putting this block number in retry queue if !(result.Failure == 0) { - - PushBlockIntoRetryQueue(redis, block.Number().String()) return false - } // Constructing block data to be persisted // // This is what we just published on pubsub channel - packedBlock := pubsubWorker(packedTxs) + packedBlock, ok := pubsubWorker(packedTxs) + if !ok { + return false + } // If `ette` being run in mode, for only publishing data to // pubsub channel, no need to persist data @@ -180,7 +176,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // We simply publish & return from execution scope if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") { - log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt))) + log.Printf("✅ Block %d with %d tx(s) [ Took : %s ]\n", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)) status.IncrementBlocksProcessed() return true @@ -188,29 +184,15 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm } // If block doesn't contain any tx, we'll attempt to persist only block - if err := db.StoreBlock(_db, packedBlock, status); err != nil { + if err := db.StoreBlock(_db, packedBlock, status, queue); err != nil { - log.Print(color.Red.Sprintf("[+] Failed to process block %d with %d tx(s) : %s [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), err.Error(), time.Now().UTC().Sub(startingAt))) - - // If failed to persist, we'll put it in retry queue - PushBlockIntoRetryQueue(redis, block.Number().String()) + log.Printf("❗️ Failed to process block %d : %s\n", block.NumberU64(), err.Error()) return false } - if !HasBlockFinalized(status, packedBlock.Block.Number) { - - log.Print(color.LightRed.Sprintf("[x] Non-final block %d with %d tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, block.Transactions().Len(), time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis))) - - // Pushing into unfinalized block queue, to be picked up only when - // finality for this block has been achieved - PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number)) - return true - - } - // Successfully processed block - log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt))) + log.Printf("✅ Block %d with %d tx(s) [ Took : %s ]\n", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)) status.IncrementBlocksProcessed() return true diff --git a/app/block/fetch.go b/app/block/fetch.go index 3e28c4ba..8dfb01c0 100644 --- a/app/block/fetch.go +++ b/app/block/fetch.go @@ -2,7 +2,6 @@ package block import ( "context" - "fmt" "log" "math/big" "time" @@ -10,33 +9,32 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" - "github.com/gookit/color" d "github.com/itzmeanjan/ette/app/data" "github.com/itzmeanjan/ette/app/db" + q "github.com/itzmeanjan/ette/app/queue" "gorm.io/gorm" ) // FetchBlockByHash - Fetching block content using blockHash -func FetchBlockByHash(client *ethclient.Client, hash common.Hash, number string, _db *gorm.DB, redis *d.RedisInfo, _status *d.StatusHolder) { +func FetchBlockByHash(client *ethclient.Client, hash common.Hash, number string, _db *gorm.DB, redis *d.RedisInfo, queue *q.BlockProcessorQueue, _status *d.StatusHolder) bool { // Starting block processing at startingAt := time.Now().UTC() block, err := client.BlockByHash(context.Background(), hash) if err != nil { - // Pushing block number into Redis queue for retrying later - PushBlockIntoRetryQueue(redis, number) - log.Print(color.Red.Sprintf("[!] Failed to fetch block by hash [ block : %s] : %s", number, err.Error())) - return + log.Printf("❗️ Failed to fetch block %s : %s\n", number, err.Error()) + return false + } - ProcessBlockContent(client, block, _db, redis, true, _status, startingAt) + return ProcessBlockContent(client, block, _db, redis, true, queue, _status, startingAt) } // FetchBlockByNumber - Fetching block content using block number -func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, redis *d.RedisInfo, publishable bool, _status *d.StatusHolder) { +func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, redis *d.RedisInfo, publishable bool, queue *q.BlockProcessorQueue, _status *d.StatusHolder) bool { // Starting block processing at startingAt := time.Now().UTC() @@ -46,22 +44,14 @@ func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, r block, err := client.BlockByNumber(context.Background(), _num) if err != nil { - // Pushing block number into Redis queue for retrying later - PushBlockIntoRetryQueue(redis, fmt.Sprintf("%d", number)) - log.Print(color.Red.Sprintf("[!] Failed to fetch block by number [ block : %d ] : %s", number, err)) - return - } - - // If attempt to process block by number went successful - // we can consider removing this block number's entry from - // attempt count tracker table - if ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt) { - - RemoveBlockFromAttemptCountTrackerTable(redis, fmt.Sprintf("%d", number)) + log.Printf("❗️ Failed to fetch block %d : %s\n", number, err) + return false } + return ProcessBlockContent(client, block, _db, redis, publishable, queue, _status, startingAt) + } // FetchTransactionByHash - Fetching specific transaction related data, tries to publish data if required @@ -71,7 +61,7 @@ func FetchTransactionByHash(client *ethclient.Client, block *types.Block, tx *ty receipt, err := client.TransactionReceipt(context.Background(), tx.Hash()) if err != nil { - log.Print(color.Red.Sprintf("[!] Failed to fetch tx receipt [ block : %d ] : %s", block.NumberU64(), err.Error())) + log.Printf("❗️ Failed to fetch tx receipt [ block : %d ] : %s\n", block.NumberU64(), err.Error()) // Passing nil, to denote, failed to fetch all tx data // from blockchain node @@ -81,7 +71,7 @@ func FetchTransactionByHash(client *ethclient.Client, block *types.Block, tx *ty sender, err := client.TransactionSender(context.Background(), tx, block.Hash(), receipt.TransactionIndex) if err != nil { - log.Print(color.Red.Sprintf("[!] Failed to fetch tx sender [ block : %d ] : %s", block.NumberU64(), err.Error())) + log.Printf("❗️ Failed to fetch tx sender [ block : %d ] : %s\n", block.NumberU64(), err.Error()) // Passing nil, to denote, failed to fetch all tx data // from blockchain node diff --git a/app/block/listener.go b/app/block/listener.go index 26e11a7a..ead72658 100644 --- a/app/block/listener.go +++ b/app/block/listener.go @@ -9,21 +9,21 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/gammazero/workerpool" - "github.com/gookit/color" cfg "github.com/itzmeanjan/ette/app/config" d "github.com/itzmeanjan/ette/app/data" + q "github.com/itzmeanjan/ette/app/queue" "gorm.io/gorm" ) // SubscribeToNewBlocks - Listen for event when new block header is // available, then fetch block content ( including all transactions ) // in different worker -func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, status *d.StatusHolder, redis *d.RedisInfo) { +func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, status *d.StatusHolder, redis *d.RedisInfo, queue *q.BlockProcessorQueue) { headerChan := make(chan *types.Header) subs, err := connection.Websocket.SubscribeNewHead(context.Background(), headerChan) if err != nil { - log.Fatal(color.Red.Sprintf("[!] Failed to subscribe to block headers : %s", err.Error())) + log.Fatalf("❗️ Failed to subscribe to block headers : %s\n", err.Error()) } // Scheduling unsubscribe, to be executed when end of this execution scope is reached defer subs.Unsubscribe() @@ -43,7 +43,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, select { case err := <-subs.Err(): - log.Fatal(color.Red.Sprintf("[!] Listener stopped : %s", err.Error())) + log.Fatalf("❗️ Listener stopped : %s\n", err.Error()) case header := <-headerChan: @@ -51,7 +51,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // should be greater than max block number obtained from DB if first && !(header.Number.Uint64() > status.MaxBlockNumberAtStartUp()) { - log.Fatal(color.Red.Sprintf("[!] Bad block received : expected > `%d`\n", status.MaxBlockNumberAtStartUp())) + log.Fatalf("❗️ Bad block received : expected > `%d`\n", status.MaxBlockNumberAtStartUp()) } @@ -65,7 +65,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // It can be improved. if !first && header.Number.Uint64() > status.GetLatestBlockNumber()+1 { - log.Fatal(color.Red.Sprintf("[!] Bad block received %d, expected %d", header.Number.Uint64(), status.GetLatestBlockNumber())) + log.Fatalf("❗️ Bad block received %d, expected %d\n", header.Number.Uint64(), status.GetLatestBlockNumber()) } @@ -74,15 +74,16 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // reorganization, we'll attempt to process this new block if !first && !(header.Number.Uint64() == status.GetLatestBlockNumber()+1) { - log.Printf(color.Blue.Sprintf("[*] Received block %d again, expected %d, attempting to process", header.Number.Uint64(), status.GetLatestBlockNumber()+1)) + log.Printf("🔅 Received block %d again, expected %d\n", header.Number.Uint64(), status.GetLatestBlockNumber()+1) } else { - log.Printf(color.Blue.Sprintf("[*] Received block %d, attempting to process", header.Number.Uint64())) + log.Printf("🔆 Received block %d\n", header.Number.Uint64()) } status.SetLatestBlockNumber(header.Number.Uint64()) + queue.Latest(header.Number.Uint64()) if first { @@ -92,7 +93,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // Starting go routine for fetching blocks `ette` failed to process in previous attempt // // Uses Redis backed queue for fetching pending block hash & retries - go RetryQueueManager(connection.RPC, _db, redis, status) + go RetryQueueManager(connection.RPC, _db, redis, queue, status) // If historical data query features are enabled // only then we need to sync to latest state of block chain @@ -111,14 +112,14 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // some reorg, in the time duration, when `ette` was offline // // So we've to take a look at those - to := status.MaxBlockNumberAtStartUp() - cfg.GetBlockConfirmations() - - // block number can never be negative - if to < 0 { + var to uint64 + if status.MaxBlockNumberAtStartUp() < cfg.GetBlockConfirmations() { to = 0 + } else { + to = status.MaxBlockNumberAtStartUp() - cfg.GetBlockConfirmations() } - go SyncBlocksByRange(connection.RPC, _db, redis, from, to, status) + go SyncBlocksByRange(connection.RPC, _db, redis, queue, from, to, status) } // Making sure that when next latest block header is received, it'll not @@ -137,7 +138,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // // Though it'll be picked up sometime in future ( by missing block finder ), but it can be safely handled now // so that it gets processed immediately - func(blockHash common.Hash, blockNumber uint64) { + func(blockHash common.Hash, blockNumber uint64, _queue *q.BlockProcessorQueue) { // When only processing blocks in real-time mode // no need to check what's present in unfinalized block number queue @@ -145,53 +146,32 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // real-time subscription mechanism if cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3" { - // Attempting to submit all blocks to job processor queue - // if more blocks are present in non-final queue, than actually - // should be - for GetUnfinalizedQueueLength(redis) > int64(cfg.GetBlockConfirmations()) { - - // Before submitting new block processing job - // checking whether there exists any block in unfinalized - // block queue or not - // - // If yes, we're attempting to process it, because it has now - // achieved enough confirmations - if CheckIfOldestBlockIsConfirmed(redis, status) { - - oldest := PopOldestBlockFromUnfinalizedQueue(redis) + // Next block which can be attempted to be checked + // while finally considering it confirmed & put into DB + if nxt, ok := _queue.ConfirmedNext(); ok { - log.Print(color.Yellow.Sprintf("[*] Attempting to process finalised block %d [ Latest Block : %d | In Queue : %d ]", oldest, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis))) + log.Printf("🔅 Processing finalised block %d [ Latest Block : %d ]\n", nxt, status.GetLatestBlockNumber()) - // Taking `oldest` variable's copy in local scope of closure, so that during - // iteration over queue elements, none of them get missed, becuase we're - // dealing with concurrent system, where previous `oldest` can be overwritten - // by new `oldest` & we end up missing a block - func(_oldestBlock uint64) { + // Taking `oldest` variable's copy in local scope of closure, so that during + // iteration over queue elements, none of them get missed, becuase we're + // dealing with concurrent system, where previous `oldest` can be overwritten + // by new `oldest` & we end up missing a block + func(_oldestBlock uint64, _queue *q.BlockProcessorQueue) { - wp.Submit(func() { + wp.Submit(func() { - FetchBlockByNumber(connection.RPC, - _oldestBlock, - _db, - redis, - false, - status) + if !FetchBlockByNumber(connection.RPC, _oldestBlock, _db, redis, false, queue, status) { - }) + _queue.ConfirmedFailed(_oldestBlock) + return - }(oldest) + } - } else { + _queue.ConfirmedDone(_oldestBlock) - // If left most block is not yet finalized, it'll attempt to - // reorganize that queue so that other blocks waiting to be processed - // can get that opportunity - // - // This situation generally occurs due to concurrent pattern implemented - // in block processor - MoveUnfinalizedOldestBlockToEnd(redis) + }) - } + }(nxt, _queue) } @@ -199,16 +179,22 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, wp.Submit(func() { - FetchBlockByHash(connection.RPC, - blockHash, - fmt.Sprintf("%d", blockNumber), - _db, - redis, - status) + if !_queue.Put(blockNumber) { + return + } + + if !FetchBlockByHash(connection.RPC, blockHash, fmt.Sprintf("%d", blockNumber), _db, redis, queue, status) { + + _queue.UnconfirmedFailed(blockNumber) + return + + } + + _queue.UnconfirmedDone(blockNumber) }) - }(header.Hash(), header.Number.Uint64()) + }(header.Hash(), header.Number.Uint64(), queue) } } diff --git a/app/block/publish.go b/app/block/publish.go deleted file mode 100644 index 401ba061..00000000 --- a/app/block/publish.go +++ /dev/null @@ -1,154 +0,0 @@ -package block - -import ( - "context" - "log" - - "github.com/gookit/color" - d "github.com/itzmeanjan/ette/app/data" - "github.com/itzmeanjan/ette/app/db" -) - -// PublishBlock - Attempts to publish block data to Redis pubsub channel -func PublishBlock(block *db.PackedBlock, redis *d.RedisInfo) { - - if block == nil { - return - } - - if err := redis.Client.Publish(context.Background(), "block", &d.Block{ - Hash: block.Block.Hash, - Number: block.Block.Number, - Time: block.Block.Time, - ParentHash: block.Block.ParentHash, - Difficulty: block.Block.Difficulty, - GasUsed: block.Block.GasUsed, - GasLimit: block.Block.GasLimit, - Nonce: block.Block.Nonce, - Miner: block.Block.Miner, - Size: block.Block.Size, - StateRootHash: block.Block.StateRootHash, - UncleHash: block.Block.UncleHash, - TransactionRootHash: block.Block.TransactionRootHash, - ReceiptRootHash: block.Block.ReceiptRootHash, - ExtraData: block.Block.ExtraData, - }).Err(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to publish block %d in channel : %s", block.Block.Number, err.Error())) - return - } - - log.Printf(color.LightMagenta.Sprintf("[*] Published block %d", block.Block.Number)) - - PublishTxs(block.Block.Number, block.Transactions, redis) - -} - -// PublishTxs - Publishes all transactions in a block to redis pubsub -// channel -func PublishTxs(blockNumber uint64, txs []*db.PackedTransaction, redis *d.RedisInfo) { - - if txs == nil { - return - } - - var eventCount uint64 - - for _, t := range txs { - PublishTx(blockNumber, t, redis) - - // how many events are present in this block, in total - eventCount += uint64(len(t.Events)) - } - - log.Printf(color.LightMagenta.Sprintf("[*] Published %d transactions of block %d", len(txs), blockNumber)) - log.Printf(color.LightMagenta.Sprintf("[*] Published %d events of block %d", eventCount, blockNumber)) - -} - -// PublishTx - Publishes tx & events in tx, related data to respective -// Redis pubsub channel -func PublishTx(blockNumber uint64, tx *db.PackedTransaction, redis *d.RedisInfo) { - - if tx == nil { - return - } - - var pTx *d.Transaction - - if tx.Tx.To == "" { - // This is a contract creation tx - pTx = &d.Transaction{ - Hash: tx.Tx.Hash, - From: tx.Tx.From, - Contract: tx.Tx.Contract, - Value: tx.Tx.Value, - Data: tx.Tx.Data, - Gas: tx.Tx.Gas, - GasPrice: tx.Tx.GasPrice, - Cost: tx.Tx.Cost, - Nonce: tx.Tx.Nonce, - State: tx.Tx.State, - BlockHash: tx.Tx.BlockHash, - } - } else { - // This is a normal tx, so we keep contract field empty - pTx = &d.Transaction{ - Hash: tx.Tx.Hash, - From: tx.Tx.From, - To: tx.Tx.To, - Value: tx.Tx.Value, - Data: tx.Tx.Data, - Gas: tx.Tx.Gas, - GasPrice: tx.Tx.GasPrice, - Cost: tx.Tx.Cost, - Nonce: tx.Tx.Nonce, - State: tx.Tx.State, - BlockHash: tx.Tx.BlockHash, - } - } - - if err := redis.Client.Publish(context.Background(), "transaction", pTx).Err(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to publish transaction from block %d : %s", blockNumber, err.Error())) - return - } - - PublishEvents(blockNumber, tx.Events, redis) - -} - -// PublishEvents - Iterate over all events & try to publish them on -// redis pubsub channel -func PublishEvents(blockNumber uint64, events []*db.Events, redis *d.RedisInfo) { - - if events == nil { - return - } - - for _, e := range events { - PublishEvent(blockNumber, e, redis) - } - -} - -// PublishEvent - Publishing event/ log entry to redis pub-sub topic, to be captured by subscribers -// and sent to client application, who are interested in this piece of data -// after applying filter -func PublishEvent(blockNumber uint64, event *db.Events, redis *d.RedisInfo) { - - if event == nil { - return - } - - if err := redis.Client.Publish(context.Background(), "event", &d.Event{ - Origin: event.Origin, - Index: event.Index, - Topics: event.Topics, - Data: event.Data, - TransactionHash: event.TransactionHash, - BlockHash: event.BlockHash, - }).Err(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to publish event from block %d : %s", blockNumber, err.Error())) - return - } - -} diff --git a/app/block/publish_block.go b/app/block/publish_block.go new file mode 100644 index 00000000..0cff1bd7 --- /dev/null +++ b/app/block/publish_block.go @@ -0,0 +1,47 @@ +package block + +import ( + "context" + "log" + + d "github.com/itzmeanjan/ette/app/data" + "github.com/itzmeanjan/ette/app/db" +) + +// PublishBlock - Attempts to publish block data to Redis pubsub channel +func PublishBlock(block *db.PackedBlock, redis *d.RedisInfo) bool { + + if block == nil { + return false + } + + _block := &d.Block{ + Hash: block.Block.Hash, + Number: block.Block.Number, + Time: block.Block.Time, + ParentHash: block.Block.ParentHash, + Difficulty: block.Block.Difficulty, + GasUsed: block.Block.GasUsed, + GasLimit: block.Block.GasLimit, + Nonce: block.Block.Nonce, + Miner: block.Block.Miner, + Size: block.Block.Size, + StateRootHash: block.Block.StateRootHash, + UncleHash: block.Block.UncleHash, + TransactionRootHash: block.Block.TransactionRootHash, + ReceiptRootHash: block.Block.ReceiptRootHash, + ExtraData: block.Block.ExtraData, + } + + if err := redis.Client.Publish(context.Background(), redis.BlockPublishTopic, _block).Err(); err != nil { + + log.Printf("❗️ Failed to publish block %d : %s\n", block.Block.Number, err.Error()) + return false + + } + + log.Printf("📎 Published block %d\n", block.Block.Number) + + return PublishTxs(block.Block.Number, block.Transactions, redis) + +} diff --git a/app/block/publish_event.go b/app/block/publish_event.go new file mode 100644 index 00000000..1f05c15e --- /dev/null +++ b/app/block/publish_event.go @@ -0,0 +1,61 @@ +package block + +import ( + "context" + "log" + + d "github.com/itzmeanjan/ette/app/data" + "github.com/itzmeanjan/ette/app/db" +) + +// PublishEvents - Iterate over all events & try to publish them on +// redis pubsub channel +func PublishEvents(blockNumber uint64, events []*db.Events, redis *d.RedisInfo) bool { + + if events == nil { + return false + } + + var status bool + + for _, e := range events { + + status = PublishEvent(blockNumber, e, redis) + if !status { + break + } + + } + + return status + +} + +// PublishEvent - Publishing event/ log entry to redis pub-sub topic, to be captured by subscribers +// and sent to client application, who are interested in this piece of data +// after applying filter +func PublishEvent(blockNumber uint64, event *db.Events, redis *d.RedisInfo) bool { + + if event == nil { + return false + } + + data := &d.Event{ + Origin: event.Origin, + Index: event.Index, + Topics: event.Topics, + Data: event.Data, + TransactionHash: event.TransactionHash, + BlockHash: event.BlockHash, + } + + if err := redis.Client.Publish(context.Background(), redis.EventPublishTopic, data).Err(); err != nil { + + log.Printf("❗️ Failed to publish event from block %d : %s\n", blockNumber, err.Error()) + return false + + } + + return true + +} diff --git a/app/block/publish_tx.go b/app/block/publish_tx.go new file mode 100644 index 00000000..6ffe7007 --- /dev/null +++ b/app/block/publish_tx.go @@ -0,0 +1,96 @@ +package block + +import ( + "context" + "log" + + d "github.com/itzmeanjan/ette/app/data" + "github.com/itzmeanjan/ette/app/db" +) + +// PublishTxs - Publishes all transactions in a block to redis pubsub +// channel +func PublishTxs(blockNumber uint64, txs []*db.PackedTransaction, redis *d.RedisInfo) bool { + + if txs == nil { + return false + } + + var eventCount uint64 + var status bool + + for _, t := range txs { + + status = PublishTx(blockNumber, t, redis) + if !status { + break + } + + // how many events are present in this block, in total + eventCount += uint64(len(t.Events)) + + } + + if !status { + return status + } + + log.Printf("📎 Published %d transactions of block %d\n", len(txs), blockNumber) + log.Printf("📎 Published %d events of block %d\n", eventCount, blockNumber) + + return status + +} + +// PublishTx - Publishes tx & events in tx, related data to respective +// Redis pubsub channel +func PublishTx(blockNumber uint64, tx *db.PackedTransaction, redis *d.RedisInfo) bool { + + if tx == nil { + return false + } + + var pTx *d.Transaction + + if tx.Tx.To == "" { + // This is a contract creation tx + pTx = &d.Transaction{ + Hash: tx.Tx.Hash, + From: tx.Tx.From, + Contract: tx.Tx.Contract, + Value: tx.Tx.Value, + Data: tx.Tx.Data, + Gas: tx.Tx.Gas, + GasPrice: tx.Tx.GasPrice, + Cost: tx.Tx.Cost, + Nonce: tx.Tx.Nonce, + State: tx.Tx.State, + BlockHash: tx.Tx.BlockHash, + } + } else { + // This is a normal tx, so we keep contract field empty + pTx = &d.Transaction{ + Hash: tx.Tx.Hash, + From: tx.Tx.From, + To: tx.Tx.To, + Value: tx.Tx.Value, + Data: tx.Tx.Data, + Gas: tx.Tx.Gas, + GasPrice: tx.Tx.GasPrice, + Cost: tx.Tx.Cost, + Nonce: tx.Tx.Nonce, + State: tx.Tx.State, + BlockHash: tx.Tx.BlockHash, + } + } + + if err := redis.Client.Publish(context.Background(), redis.TxPublishTopic, pTx).Err(); err != nil { + + log.Printf("❗️ Failed to publish transaction from block %d : %s\n", blockNumber, err.Error()) + return false + + } + + return PublishEvents(blockNumber, tx.Events, redis) + +} diff --git a/app/block/retry.go b/app/block/retry.go index 3a0d1f8e..1fc97d80 100644 --- a/app/block/retry.go +++ b/app/block/retry.go @@ -1,19 +1,15 @@ package block import ( - "context" "log" "runtime" - "strconv" "time" "github.com/ethereum/go-ethereum/ethclient" "github.com/gammazero/workerpool" - _redis "github.com/go-redis/redis/v8" - "github.com/gookit/color" cfg "github.com/itzmeanjan/ette/app/config" - "github.com/itzmeanjan/ette/app/data" d "github.com/itzmeanjan/ette/app/data" + q "github.com/itzmeanjan/ette/app/queue" "gorm.io/gorm" ) @@ -23,9 +19,9 @@ import ( // Sleeps for 1000 milliseconds // // Keeps repeating -func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) { +func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *d.RedisInfo, queue *q.BlockProcessorQueue, status *d.StatusHolder) { sleep := func() { - time.Sleep(time.Duration(100) * time.Millisecond) + time.Sleep(time.Duration(512) * time.Millisecond) } // Creating worker pool and submitting jobs as soon as it's determined @@ -36,165 +32,33 @@ func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.Redis for { sleep() - // Popping oldest element from Redis queue - blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueue).Result() - if err != nil { + block, ok := queue.UnconfirmedNext() + if !ok { continue } - attemptCount, _ := GetAttemptCountFromTable(redis, blockNumber) - if attemptCount != 0 && attemptCount%2 != 0 { - - PushBlockIntoRetryQueue(redis, blockNumber) - continue - - } - - // Parsing string blockNumber to uint64 - parsedBlockNumber, err := strconv.ParseUint(blockNumber, 10, 64) - if err != nil { - continue - } - - log.Print(color.Cyan.Sprintf("[~] Retrying block : %d [ In Queue : %d ]", parsedBlockNumber, GetRetryQueueLength(redis))) + stat := queue.Stat() + log.Printf("ℹ️ Retrying block : %d [ Unconfirmed : ( Progress : %d, Waiting : %d ) | Confirmed : ( Progress : %d, Waiting : %d ) | Total : %d ]\n", block, stat.UnconfirmedProgress, stat.UnconfirmedWaiting, stat.ConfirmedProgress, stat.ConfirmedWaiting, stat.Total) // Submitting block processor job into pool // which will be picked up & processed // // This will stop us from blindly creating too many go routines - func(_blockNumber uint64) { + func(_blockNumber uint64, queue *q.BlockProcessorQueue) { wp.Submit(func() { - // This check helps us in determining whether we should - // consider sending notification over pubsub channel for this block - // whose processing failed due to some reasons in last attempt - if status.MaxBlockNumberAtStartUp() <= _blockNumber { + if !FetchBlockByNumber(client, _blockNumber, _db, redis, true, queue, status) { - FetchBlockByNumber(client, _blockNumber, _db, redis, true, status) + queue.UnconfirmedFailed(_blockNumber) return } - FetchBlockByNumber(client, _blockNumber, _db, redis, false, status) + queue.UnconfirmedDone(_blockNumber) }) - }(parsedBlockNumber) - } -} - -// PushBlockIntoRetryQueue - Pushes failed to fetch block number at end of Redis queue -// given it has not already been added -func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) { - // Checking presence first & then deciding whether to add it or not - if !CheckBlockInRetryQueue(redis, blockNumber) { - - if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueue, blockNumber).Result(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to push block %s into retry queue : %s", blockNumber, err.Error())) - } - - IncrementAttemptCountOfBlockNumber(redis, blockNumber) - - } -} - -// IncrementAttemptCountOfBlockNumber - Given block number, increments failed attempt count -// of processing this block -// -// If block doesn't yet exist in tracker table, it'll be inserted first time & counter to be set to 0 -// -// It'll be wrapped back to 0 as soon as it reaches 100 -func IncrementAttemptCountOfBlockNumber(redis *data.RedisInfo, blockNumber string) { - - var wrappedAttemptCount int - - // Attempting to increment 👇, only when it's not first time - // when this attempt counter for block number being initialized - // - // So this ensures for first time it gets initialized to 0 - attemptCount, err := GetAttemptCountFromTable(redis, blockNumber) - if err == nil { - wrappedAttemptCount = (int(attemptCount) + 1) % 100 - } - - if _, err := redis.Client.HSet(context.Background(), redis.BlockRetryCountTable, blockNumber, wrappedAttemptCount).Result(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to increment attempt count of block %s : %s", blockNumber, err.Error())) - } - -} - -// CheckBlockInAttemptCounterTable - Checks whether given block number already exist in -// attempt count tracker table -func CheckBlockInAttemptCounterTable(redis *data.RedisInfo, blockNumber string) bool { - - if _, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result(); err != nil { - return false - } - - return true - -} - -// GetAttemptCountFromTable - Returns current attempt counter from table -// for given block number -func GetAttemptCountFromTable(redis *data.RedisInfo, blockNumber string) (uint64, error) { - - count, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result() - if err != nil { - return 0, err - } - - parsedCount, err := strconv.ParseUint(count, 10, 64) - if err != nil { - return 0, err - } - - return parsedCount, nil - -} - -// RemoveBlockFromAttemptCountTrackerTable - Attempt to delete block number's -// associated attempt count, given it already exists in table -// -// This is supposed to be invoked when a block is considered to be successfully processed -func RemoveBlockFromAttemptCountTrackerTable(redis *data.RedisInfo, blockNumber string) { - - if CheckBlockInAttemptCounterTable(redis, blockNumber) { - - if _, err := redis.Client.HDel(context.Background(), redis.BlockRetryCountTable, blockNumber).Result(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to delete attempt count of successful block %s : %s", blockNumber, err.Error())) - } - + }(block, queue) } - -} - -// CheckBlockInRetryQueue - Checks whether block number is already added in -// Redis backed retry queue or not -// -// If yes, it'll not be added again -// -// Note: this feature of checking index of value in redis queue, -// was added in Redis v6.0.6 : https://redis.io/commands/lpos -func CheckBlockInRetryQueue(redis *data.RedisInfo, blockNumber string) bool { - - if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueue, blockNumber, _redis.LPosArgs{}).Result(); err != nil { - return false - } - - return true - -} - -// GetRetryQueueLength - Returns redis backed retry queue length -func GetRetryQueueLength(redis *data.RedisInfo) int64 { - - blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueue).Result() - if err != nil { - log.Printf(color.Red.Sprintf("[!] Failed to determine retry queue length : %s", err.Error())) - } - - return blockCount - } diff --git a/app/block/syncer.go b/app/block/syncer.go index fddebfd5..9f9ff3e9 100644 --- a/app/block/syncer.go +++ b/app/block/syncer.go @@ -1,7 +1,6 @@ package block import ( - "fmt" "log" "runtime" "sort" @@ -11,9 +10,9 @@ import ( "github.com/gammazero/workerpool" "github.com/gookit/color" cfg "github.com/itzmeanjan/ette/app/config" - "github.com/itzmeanjan/ette/app/data" d "github.com/itzmeanjan/ette/app/data" "github.com/itzmeanjan/ette/app/db" + q "github.com/itzmeanjan/ette/app/queue" "gorm.io/gorm" ) @@ -46,7 +45,7 @@ func FindMissingBlocksInRange(found []uint64, from uint64, to uint64) []uint64 { // while running n workers concurrently, where n = number of cores this machine has // // Waits for all of them to complete -func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromBlock uint64, toBlock uint64, status *d.StatusHolder, jd func(*workerpool.WorkerPool, *d.Job)) { +func Syncer(client *ethclient.Client, _db *gorm.DB, redis *d.RedisInfo, queue *q.BlockProcessorQueue, fromBlock uint64, toBlock uint64, status *d.StatusHolder, jd func(*workerpool.WorkerPool, *d.Job, *q.BlockProcessorQueue)) { if !(fromBlock <= toBlock) { log.Print(color.Red.Sprintf("[!] Bad block range for syncer")) return @@ -63,7 +62,7 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB Redis: redis, Block: num, Status: status, - }) + }, queue) } // attempting to fetch X blocks ( max ) at a time, by range @@ -81,7 +80,7 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB blocks := db.GetAllBlockNumbersInRange(_db, i, toShouldbe) // No blocks present in DB, in queried range - if blocks == nil || len(blocks) == 0 { + if len(blocks) == 0 { // So submitting all of them to job processor queue for j := i; j <= toShouldbe; j++ { @@ -113,40 +112,38 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB // // Range can be either ascending or descending, depending upon that proper arguments to be // passed to `Syncer` function during invokation -func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromBlock uint64, toBlock uint64, status *d.StatusHolder) { +func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redis *d.RedisInfo, queue *q.BlockProcessorQueue, fromBlock uint64, toBlock uint64, status *d.StatusHolder) { // Job to be submitted and executed by each worker // // Job specification is provided in `Job` struct - job := func(wp *workerpool.WorkerPool, j *d.Job) { + job := func(wp *workerpool.WorkerPool, j *d.Job, queue *q.BlockProcessorQueue) { wp.Submit(func() { - if !HasBlockFinalized(status, j.Block) { - - log.Print(color.LightRed.Sprintf("[x] Non-final block %d [ Latest Block : %d | In Queue : %d ]", j.Block, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis))) - - // Pushing into unfinalized block queue, to be picked up only when - // finality for this block has been achieved - PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", j.Block)) + if !queue.Put(j.Block) { return + } + if !FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, false, queue, j.Status) { + queue.UnconfirmedFailed(j.Block) + return } - FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, false, j.Status) + queue.UnconfirmedDone(j.Block) }) } - log.Printf("[*] Starting block syncer\n") + log.Printf("✅ Starting block syncer\n") if fromBlock < toBlock { - Syncer(client, _db, redis, fromBlock, toBlock, status, job) + Syncer(client, _db, redis, queue, fromBlock, toBlock, status, job) } else { - Syncer(client, _db, redis, toBlock, fromBlock, status, job) + Syncer(client, _db, redis, queue, toBlock, fromBlock, status, job) } - log.Printf("[+] Stopping block syncer\n") + log.Printf("✅ Stopping block syncer\n") // Once completed first iteration of processing blocks upto last time where it left // off, we're going to start worker to look at DB & decide which blocks are missing @@ -154,20 +151,17 @@ func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redis *data.Redis // // And this will itself run as a infinite job, completes one iteration & // takes break for 1 min, then repeats - go SyncMissingBlocksInDB(client, _db, redis, status) + go SyncMissingBlocksInDB(client, _db, redis, queue, status) + } // SyncMissingBlocksInDB - Checks with database for what blocks are present & what are not, fetches missing // blocks & related data iteratively -func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) { - - // Sleep for 1 minute & then again check whether we need to fetch missing blocks or not - sleep := func() { - time.Sleep(time.Duration(1) * time.Minute) - } +func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redis *d.RedisInfo, queue *q.BlockProcessorQueue, status *d.StatusHolder) { for { - log.Printf("[*] Starting missing block finder\n") + + log.Printf("✅ Starting missing block finder\n") currentBlockNumber := db.GetCurrentBlockNumber(_db) @@ -177,43 +171,45 @@ func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redis *data.R // If all blocks present in between 0 to latest block in network // `ette` sleeps for 1 minute & again get to work if currentBlockNumber+1 == blockCount { - log.Print(color.Green.Sprintf("[+] No missing blocks found")) - sleep() + log.Printf("✅ No missing blocks found\n") + + <-time.After(time.Duration(1) * time.Minute) continue } // Job to be submitted and executed by each worker // // Job specification is provided in `Job` struct - job := func(wp *workerpool.WorkerPool, j *d.Job) { + job := func(wp *workerpool.WorkerPool, j *d.Job, queue *q.BlockProcessorQueue) { wp.Submit(func() { - if !HasBlockFinalized(status, j.Block) { - - log.Print(color.LightRed.Sprintf("[x] Non-final block %d [ Latest Block : %d | In Queue : %d ]", j.Block, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis))) - - // Pushing into unfinalized block queue, to be picked up only when - // finality for this block has been achieved - PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", j.Block)) + // Worker fetches block by number from local storage + block := db.GetBlock(j.DB, j.Block) + if !(block == nil) { return + } + if !queue.Put(j.Block) { + return } - // Worker fetches block by number from local storage - block := db.GetBlock(j.DB, j.Block) - if block == nil && !CheckBlockInRetryQueue(redis, fmt.Sprintf("%d", j.Block)) { - // If not found, block fetching cycle is run, for this block - FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, false, j.Status) + if !FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, false, queue, j.Status) { + queue.UnconfirmedFailed(j.Block) + return } + queue.UnconfirmedDone(j.Block) + }) + } - Syncer(client, _db, redis, 0, currentBlockNumber, status, job) + Syncer(client, _db, redis, queue, 0, currentBlockNumber, status, job) + + log.Printf("✅ Stopping missing block finder\n") + <-time.After(time.Duration(1) * time.Minute) - log.Printf("[+] Stopping missing block finder\n") - sleep() } } diff --git a/app/block/unfinalized_blocks.go b/app/block/unfinalized_blocks.go deleted file mode 100644 index 4fdfd5c0..00000000 --- a/app/block/unfinalized_blocks.go +++ /dev/null @@ -1,113 +0,0 @@ -package block - -import ( - "context" - "fmt" - "log" - "strconv" - - _redis "github.com/go-redis/redis/v8" - "github.com/gookit/color" - "github.com/itzmeanjan/ette/app/data" -) - -// GetOldestBlockFromUnfinalizedQueue - Attempts to read left most element of Redis backed non-final block queue -// i.e. element at 0th index -func GetOldestBlockFromUnfinalizedQueue(redis *data.RedisInfo) string { - - blockNumber, err := redis.Client.LIndex(context.Background(), redis.UnfinalizedBlocksQueue, 0).Result() - if err != nil { - return "" - } - - return blockNumber - -} - -// CheckIfOldestBlockIsConfirmed - Given oldest block number present in redis backed unfinalized queue -// checks whether this block has yet reached finality or not -func CheckIfOldestBlockIsConfirmed(redis *data.RedisInfo, status *data.StatusHolder) bool { - - oldest := GetOldestBlockFromUnfinalizedQueue(redis) - if oldest == "" { - return false - } - - parsedOldestBlockNumber, err := strconv.ParseUint(oldest, 10, 64) - if err != nil { - return false - } - - return HasBlockFinalized(status, parsedOldestBlockNumber) - -} - -// PopOldestBlockFromUnfinalizedQueue - Pops oldest block i.e. left most block from non-final block number -// queue, which can be processed now -func PopOldestBlockFromUnfinalizedQueue(redis *data.RedisInfo) uint64 { - - blockNumber, err := redis.Client.LPop(context.Background(), redis.UnfinalizedBlocksQueue).Result() - if err != nil { - return 0 - } - - parsedBlockNumber, err := strconv.ParseUint(blockNumber, 10, 64) - if err != nil { - return 0 - } - - return parsedBlockNumber - -} - -// PushBlockIntoUnfinalizedQueue - Pushes block number, which has not yet reached finality, into -// Redis backed queue, which will be poped out only when `N` block -// confirmations achieved on top of it -func PushBlockIntoUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) { - // Checking presence first & then deciding whether to add it or not - if !CheckBlockInUnfinalizedQueue(redis, blockNumber) { - - if _, err := redis.Client.RPush(context.Background(), redis.UnfinalizedBlocksQueue, blockNumber).Result(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to push block %s into non-final block queue : %s", blockNumber, err.Error())) - } - - } -} - -// MoveUnfinalizedOldestBlockToEnd - Attempts to pop oldest block ( i.e. left most block ) -// from unfinalized queue & pushes it back to end of queue, so that other blocks waiting after -// this one can get be attempted to be processed by workers -// -// @note This can be improved using `LMOVE` command of Redis ( >= 6.2.0 ) -func MoveUnfinalizedOldestBlockToEnd(redis *data.RedisInfo) { - - PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", PopOldestBlockFromUnfinalizedQueue(redis))) - -} - -// CheckBlockInUnfinalizedQueue - Checks whether block number is already added in -// Redis backed unfinalized queue or not -// -// If yes, it'll not be added again -// -// Note: this feature of checking index of value in redis queue, -// was added in Redis v6.0.6 : https://redis.io/commands/lpos -func CheckBlockInUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) bool { - if _, err := redis.Client.LPos(context.Background(), redis.UnfinalizedBlocksQueue, blockNumber, _redis.LPosArgs{}).Result(); err != nil { - return false - } - - return true -} - -// GetUnfinalizedQueueLength - Returns redis backed unfinalized block number queue length -func GetUnfinalizedQueueLength(redis *data.RedisInfo) int64 { - - blockCount, err := redis.Client.LLen(context.Background(), redis.UnfinalizedBlocksQueue).Result() - if err != nil { - log.Printf(color.Red.Sprintf("[!] Failed to determine non-final block queue length : %s", err.Error())) - } - - return blockCount - -} diff --git a/app/data/block.go b/app/data/block.go new file mode 100644 index 00000000..ebc631b5 --- /dev/null +++ b/app/data/block.go @@ -0,0 +1,88 @@ +package data + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "log" +) + +// Block - Block related info to be delivered to client in this format +type Block struct { + Hash string `json:"hash" gorm:"column:hash"` + Number uint64 `json:"number" gorm:"column:number"` + Time uint64 `json:"time" gorm:"column:time"` + ParentHash string `json:"parentHash" gorm:"column:parenthash"` + Difficulty string `json:"difficulty" gorm:"column:difficulty"` + GasUsed uint64 `json:"gasUsed" gorm:"column:gasused"` + GasLimit uint64 `json:"gasLimit" gorm:"column:gaslimit"` + Nonce string `json:"nonce" gorm:"column:nonce"` + Miner string `json:"miner" gorm:"column:miner"` + Size float64 `json:"size" gorm:"column:size"` + StateRootHash string `json:"stateRootHash" gorm:"column:stateroothash"` + UncleHash string `json:"uncleHash" gorm:"column:unclehash"` + TransactionRootHash string `json:"txRootHash" gorm:"column:txroothash"` + ReceiptRootHash string `json:"receiptRootHash" gorm:"column:receiptroothash"` + ExtraData []byte `json:"extraData" gorm:"column:extradata"` +} + +// MarshalBinary - Implementing binary marshalling function, to be invoked +// by redis before publishing data on channel +func (b *Block) MarshalBinary() ([]byte, error) { + return json.Marshal(b) +} + +// MarshalJSON - Custom JSON encoder +func (b *Block) MarshalJSON() ([]byte, error) { + + extraData := "" + if _h := hex.EncodeToString(b.ExtraData); _h != "" { + extraData = fmt.Sprintf("0x%s", _h) + } + + return []byte(fmt.Sprintf(`{"hash":%q,"number":%d,"time":%d,"parentHash":%q,"difficulty":%q,"gasUsed":%d,"gasLimit":%d,"nonce":%q,"miner":%q,"size":%f,"stateRootHash":%q,"uncleHash":%q,"txRootHash":%q,"receiptRootHash":%q,"extraData":%q}`, + b.Hash, + b.Number, + b.Time, + b.ParentHash, + b.Difficulty, + b.GasUsed, + b.GasLimit, + b.Nonce, + b.Miner, + b.Size, + b.StateRootHash, + b.UncleHash, + b.TransactionRootHash, + b.ReceiptRootHash, + extraData)), nil + +} + +// ToJSON - Encodes into JSON, to be supplied when queried for block data +func (b *Block) ToJSON() []byte { + data, err := json.Marshal(b) + if err != nil { + log.Printf("[!] Failed to encode block data to JSON : %s\n", err.Error()) + return nil + } + + return data +} + +// Blocks - A set of blocks to be held, extracted from DB query result +// also to be supplied to client in JSON encoded form +type Blocks struct { + Blocks []*Block `json:"blocks"` +} + +// ToJSON - Encoding into JSON, to be invoked when delivering query result to client +func (b *Blocks) ToJSON() []byte { + data, err := json.Marshal(b) + if err != nil { + log.Printf("[!] Failed to encode block data to JSON : %s\n", err.Error()) + return nil + } + + return data +} diff --git a/app/data/data.go b/app/data/data.go index d1aa8a4d..4c330010 100644 --- a/app/data/data.go +++ b/app/data/data.go @@ -1,17 +1,11 @@ package data import ( - "encoding/hex" - "encoding/json" - "fmt" - "log" - "strings" "sync" "time" "github.com/ethereum/go-ethereum/ethclient" "github.com/go-redis/redis/v8" - "github.com/lib/pq" "gorm.io/gorm" ) @@ -128,19 +122,14 @@ func (s *StatusHolder) SetLatestBlockNumber(num uint64) { defer s.Mutex.Unlock() s.State.LatestBlockNumber = num - return } // RedisInfo - Holds redis related information in this struct, to be used // when passing to functions as argument type RedisInfo struct { - Client *redis.Client // using this object `ette` will talk to Redis - BlockRetryQueue string // retry queue name, for storing block numbers - BlockRetryCountTable string // keeping track of how many times this block was attempted - // to be processed in past, but went unsuccessful - UnfinalizedBlocksQueue string // stores unfinalized block numbers, processes - // them later after reaching finality ( as set by deployer of `ette` ) + Client *redis.Client // using this object `ette` will talk to Redis + BlockPublishTopic, TxPublishTopic, EventPublishTopic string } // ResultStatus - Keeps track of how many operations went successful @@ -176,223 +165,3 @@ type BlockChainNodeConnection struct { RPC *ethclient.Client Websocket *ethclient.Client } - -// Block - Block related info to be delivered to client in this format -type Block struct { - Hash string `json:"hash" gorm:"column:hash"` - Number uint64 `json:"number" gorm:"column:number"` - Time uint64 `json:"time" gorm:"column:time"` - ParentHash string `json:"parentHash" gorm:"column:parenthash"` - Difficulty string `json:"difficulty" gorm:"column:difficulty"` - GasUsed uint64 `json:"gasUsed" gorm:"column:gasused"` - GasLimit uint64 `json:"gasLimit" gorm:"column:gaslimit"` - Nonce string `json:"nonce" gorm:"column:nonce"` - Miner string `json:"miner" gorm:"column:miner"` - Size float64 `json:"size" gorm:"column:size"` - StateRootHash string `json:"stateRootHash" gorm:"column:stateroothash"` - UncleHash string `json:"uncleHash" gorm:"column:unclehash"` - TransactionRootHash string `json:"txRootHash" gorm:"column:txroothash"` - ReceiptRootHash string `json:"receiptRootHash" gorm:"column:receiptroothash"` - ExtraData []byte `json:"extraData" gorm:"column:extradata"` -} - -// MarshalBinary - Implementing binary marshalling function, to be invoked -// by redis before publishing data on channel -func (b *Block) MarshalBinary() ([]byte, error) { - return json.Marshal(b) -} - -// MarshalJSON - Custom JSON encoder -func (b *Block) MarshalJSON() ([]byte, error) { - - extraData := "" - if _h := hex.EncodeToString(b.ExtraData); _h != "" { - extraData = fmt.Sprintf("0x%s", _h) - } - - return []byte(fmt.Sprintf(`{"hash":%q,"number":%d,"time":%d,"parentHash":%q,"difficulty":%q,"gasUsed":%d,"gasLimit":%d,"nonce":%q,"miner":%q,"size":%f,"stateRootHash":%q,"uncleHash":%q,"txRootHash":%q,"receiptRootHash":%q,"extraData":%q}`, - b.Hash, - b.Number, - b.Time, - b.ParentHash, - b.Difficulty, - b.GasUsed, - b.GasLimit, - b.Nonce, - b.Miner, - b.Size, - b.StateRootHash, - b.UncleHash, - b.TransactionRootHash, - b.ReceiptRootHash, - extraData)), nil - -} - -// ToJSON - Encodes into JSON, to be supplied when queried for block data -func (b *Block) ToJSON() []byte { - data, err := json.Marshal(b) - if err != nil { - log.Printf("[!] Failed to encode block data to JSON : %s\n", err.Error()) - return nil - } - - return data -} - -// Blocks - A set of blocks to be held, extracted from DB query result -// also to be supplied to client in JSON encoded form -type Blocks struct { - Blocks []*Block `json:"blocks"` -} - -// ToJSON - Encoding into JSON, to be invoked when delivering query result to client -func (b *Blocks) ToJSON() []byte { - data, err := json.Marshal(b) - if err != nil { - log.Printf("[!] Failed to encode block data to JSON : %s\n", err.Error()) - return nil - } - - return data -} - -// Transaction - Transaction holder struct, to be supplied when queried using tx hash -type Transaction struct { - Hash string `json:"hash" gorm:"column:hash"` - From string `json:"from" gorm:"column:from"` - To string `json:"to" gorm:"column:to"` - Contract string `json:"contract" gorm:"column:contract"` - Value string `json:"value" gorm:"column:value"` - Data []byte `json:"data" gorm:"column:data"` - Gas uint64 `json:"gas" gorm:"column:gas"` - GasPrice string `json:"gasPrice" gorm:"column:gasprice"` - Cost string `json:"cost" gorm:"column:cost"` - Nonce uint64 `json:"nonce" gorm:"column:nonce"` - State uint64 `json:"state" gorm:"column:state"` - BlockHash string `json:"blockHash" gorm:"column:blockhash"` -} - -// MarshalBinary - Implementing binary marshalling function, to be invoked -// by redis before publishing data on channel -func (t *Transaction) MarshalBinary() ([]byte, error) { - return json.Marshal(t) -} - -// MarshalJSON - Custom JSON encoder -func (t *Transaction) MarshalJSON() ([]byte, error) { - - data := "" - if _h := hex.EncodeToString(t.Data); _h != "" { - data = fmt.Sprintf("0x%s", _h) - } - - // When tx doesn't create contract i.e. normal tx - if !strings.HasPrefix(t.Contract, "0x") { - return []byte(fmt.Sprintf(`{"hash":%q,"from":%q,"to":%q,"value":%q,"data":%q,"gas":%d,"gasPrice":%q,"cost":%q,"nonce":%d,"state":%d,"blockHash":%q}`, - t.Hash, t.From, t.To, t.Value, - data, t.Gas, t.GasPrice, t.Cost, t.Nonce, t.State, t.BlockHash)), nil - } - - // When tx creates contract - return []byte(fmt.Sprintf(`{"hash":%q,"from":%q,"contract":%q,"value":%q,"data":%q,"gas":%d,"gasPrice":%q,"cost":%q,"nonce":%d,"state":%d,"blockHash":%q}`, - t.Hash, t.From, t.Contract, t.Value, - data, t.Gas, t.GasPrice, t.Cost, t.Nonce, t.State, t.BlockHash)), nil - -} - -// ToJSON - JSON encoder, to be invoked before delivering tx query data to client -func (t *Transaction) ToJSON() []byte { - - data, err := json.Marshal(t) - if err != nil { - log.Printf("[!] Failed to encode transaction data to JSON : %s\n", err.Error()) - return nil - } - - return data - -} - -// Transactions - Multiple transactions holder struct -type Transactions struct { - Transactions []*Transaction `json:"transactions"` -} - -// ToJSON - Encoding into JSON, to be invoked when delivering to client -func (t *Transactions) ToJSON() []byte { - - data, err := json.Marshal(t) - if err != nil { - log.Printf("[!] Failed to encode transactions data to JSON : %s\n", err.Error()) - return nil - } - - return data - -} - -// Event - Single event entity holder, extracted from db -type Event struct { - Origin string `gorm:"column:origin"` - Index uint `gorm:"column:index"` - Topics pq.StringArray `gorm:"column:topics;type:text[]"` - Data []byte `gorm:"column:data"` - TransactionHash string `gorm:"column:txhash"` - BlockHash string `gorm:"column:blockhash"` -} - -// MarshalBinary - Implementing binary marshalling function, to be invoked -// by redis before publishing data on channel -func (e *Event) MarshalBinary() ([]byte, error) { - return e.MarshalJSON() -} - -// MarshalJSON - Custom JSON encoder -func (e *Event) MarshalJSON() ([]byte, error) { - - data := "" - if _h := hex.EncodeToString(e.Data); _h != "" && _h != strings.Repeat("0", 64) { - data = fmt.Sprintf("0x%s", _h) - } - - return []byte(fmt.Sprintf(`{"origin":%q,"index":%d,"topics":%v,"data":%q,"txHash":%q,"blockHash":%q}`, - e.Origin, - e.Index, - strings.Join( - strings.Fields( - fmt.Sprintf("%q", e.Topics)), ","), - data, e.TransactionHash, e.BlockHash)), nil - -} - -// ToJSON - Encoding into JSON -func (e *Event) ToJSON() []byte { - - data, err := json.Marshal(e) - if err != nil { - log.Printf("[!] Failed to encode event to JSON : %s\n", err.Error()) - return nil - } - - return data - -} - -// Events - A collection of event holder, to be delivered to client in this form -type Events struct { - Events []*Event `json:"events"` -} - -// ToJSON - Encoding to JSON -func (e *Events) ToJSON() []byte { - - data, err := json.Marshal(e) - if err != nil { - log.Printf("[!] Failed to encode events to JSON : %s\n", err.Error()) - return nil - } - - return data - -} diff --git a/app/data/event.go b/app/data/event.go new file mode 100644 index 00000000..6cedd61a --- /dev/null +++ b/app/data/event.go @@ -0,0 +1,76 @@ +package data + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "log" + "strings" + + "github.com/lib/pq" +) + +// Event - Single event entity holder, extracted from db +type Event struct { + Origin string `gorm:"column:origin"` + Index uint `gorm:"column:index"` + Topics pq.StringArray `gorm:"column:topics;type:text[]"` + Data []byte `gorm:"column:data"` + TransactionHash string `gorm:"column:txhash"` + BlockHash string `gorm:"column:blockhash"` +} + +// MarshalBinary - Implementing binary marshalling function, to be invoked +// by redis before publishing data on channel +func (e *Event) MarshalBinary() ([]byte, error) { + return e.MarshalJSON() +} + +// MarshalJSON - Custom JSON encoder +func (e *Event) MarshalJSON() ([]byte, error) { + + data := "" + if _h := hex.EncodeToString(e.Data); _h != "" && _h != strings.Repeat("0", 64) { + data = fmt.Sprintf("0x%s", _h) + } + + return []byte(fmt.Sprintf(`{"origin":%q,"index":%d,"topics":%v,"data":%q,"txHash":%q,"blockHash":%q}`, + e.Origin, + e.Index, + strings.Join( + strings.Fields( + fmt.Sprintf("%q", e.Topics)), ","), + data, e.TransactionHash, e.BlockHash)), nil + +} + +// ToJSON - Encoding into JSON +func (e *Event) ToJSON() []byte { + + data, err := json.Marshal(e) + if err != nil { + log.Printf("[!] Failed to encode event to JSON : %s\n", err.Error()) + return nil + } + + return data + +} + +// Events - A collection of event holder, to be delivered to client in this form +type Events struct { + Events []*Event `json:"events"` +} + +// ToJSON - Encoding to JSON +func (e *Events) ToJSON() []byte { + + data, err := json.Marshal(e) + if err != nil { + log.Printf("[!] Failed to encode events to JSON : %s\n", err.Error()) + return nil + } + + return data + +} diff --git a/app/data/tx.go b/app/data/tx.go new file mode 100644 index 00000000..68180cd3 --- /dev/null +++ b/app/data/tx.go @@ -0,0 +1,84 @@ +package data + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "log" + "strings" +) + +// Transaction - Transaction holder struct, to be supplied when queried using tx hash +type Transaction struct { + Hash string `json:"hash" gorm:"column:hash"` + From string `json:"from" gorm:"column:from"` + To string `json:"to" gorm:"column:to"` + Contract string `json:"contract" gorm:"column:contract"` + Value string `json:"value" gorm:"column:value"` + Data []byte `json:"data" gorm:"column:data"` + Gas uint64 `json:"gas" gorm:"column:gas"` + GasPrice string `json:"gasPrice" gorm:"column:gasprice"` + Cost string `json:"cost" gorm:"column:cost"` + Nonce uint64 `json:"nonce" gorm:"column:nonce"` + State uint64 `json:"state" gorm:"column:state"` + BlockHash string `json:"blockHash" gorm:"column:blockhash"` +} + +// MarshalBinary - Implementing binary marshalling function, to be invoked +// by redis before publishing data on channel +func (t *Transaction) MarshalBinary() ([]byte, error) { + return json.Marshal(t) +} + +// MarshalJSON - Custom JSON encoder +func (t *Transaction) MarshalJSON() ([]byte, error) { + + data := "" + if _h := hex.EncodeToString(t.Data); _h != "" { + data = fmt.Sprintf("0x%s", _h) + } + + // When tx doesn't create contract i.e. normal tx + if !strings.HasPrefix(t.Contract, "0x") { + return []byte(fmt.Sprintf(`{"hash":%q,"from":%q,"to":%q,"value":%q,"data":%q,"gas":%d,"gasPrice":%q,"cost":%q,"nonce":%d,"state":%d,"blockHash":%q}`, + t.Hash, t.From, t.To, t.Value, + data, t.Gas, t.GasPrice, t.Cost, t.Nonce, t.State, t.BlockHash)), nil + } + + // When tx creates contract + return []byte(fmt.Sprintf(`{"hash":%q,"from":%q,"contract":%q,"value":%q,"data":%q,"gas":%d,"gasPrice":%q,"cost":%q,"nonce":%d,"state":%d,"blockHash":%q}`, + t.Hash, t.From, t.Contract, t.Value, + data, t.Gas, t.GasPrice, t.Cost, t.Nonce, t.State, t.BlockHash)), nil + +} + +// ToJSON - JSON encoder, to be invoked before delivering tx query data to client +func (t *Transaction) ToJSON() []byte { + + data, err := json.Marshal(t) + if err != nil { + log.Printf("[!] Failed to encode transaction data to JSON : %s\n", err.Error()) + return nil + } + + return data + +} + +// Transactions - Multiple transactions holder struct +type Transactions struct { + Transactions []*Transaction `json:"transactions"` +} + +// ToJSON - Encoding into JSON, to be invoked when delivering to client +func (t *Transactions) ToJSON() []byte { + + data, err := json.Marshal(t) + if err != nil { + log.Printf("[!] Failed to encode transactions data to JSON : %s\n", err.Error()) + return nil + } + + return data + +} diff --git a/app/db/block.go b/app/db/block.go index a633ec4b..5cfd570f 100644 --- a/app/db/block.go +++ b/app/db/block.go @@ -5,6 +5,7 @@ import ( "log" d "github.com/itzmeanjan/ette/app/data" + q "github.com/itzmeanjan/ette/app/queue" "gorm.io/gorm" ) @@ -20,10 +21,10 @@ import ( // // 👆 gives us performance improvement, also taste of atomic db operation // i.e. either whole block data is written or nothing is written -func StoreBlock(dbWOTx *gorm.DB, block *PackedBlock, status *d.StatusHolder) error { +func StoreBlock(dbWOTx *gorm.DB, block *PackedBlock, status *d.StatusHolder, queue *q.BlockProcessorQueue) error { if block == nil { - return errors.New("Empty block received while attempting to persist") + return errors.New("empty block received while attempting to persist") } // -- Starting DB transaction @@ -97,8 +98,9 @@ func StoreBlock(dbWOTx *gorm.DB, block *PackedBlock, status *d.StatusHolder) err // During 👆 flow, if we've really inserted a new block into database, // count will get updated - if blockInserted && status != nil { - status.IncrementBlocksInserted() + if blockInserted && status != nil && queue != nil { + status.IncrementBlocksInserted() // @note This is to be removed + queue.Inserted(block.Block.Number) } return nil diff --git a/app/db/event.go b/app/db/event.go index be183270..d12153f7 100644 --- a/app/db/event.go +++ b/app/db/event.go @@ -17,7 +17,7 @@ import ( func UpsertEvent(dbWTx *gorm.DB, event *Events) error { if event == nil { - return errors.New("Empty event received while attempting to persist") + return errors.New("empty event received while attempting to persist") } return dbWTx.Clauses(clause.OnConflict{UpdateAll: true}).Create(event).Error diff --git a/app/db/model.go b/app/db/model.go index bee5ec8a..22d2e1c8 100644 --- a/app/db/model.go +++ b/app/db/model.go @@ -56,7 +56,7 @@ func (b *Blocks) SimilarTo(_b *Blocks) bool { b.UncleHash == _b.UncleHash && b.TransactionRootHash == _b.TransactionRootHash && b.ReceiptRootHash == _b.ReceiptRootHash && - bytes.Compare(b.ExtraData, _b.ExtraData) == 0 + bytes.Equal(b.ExtraData, _b.ExtraData) } // Transactions - Blockchain transaction holder table model diff --git a/app/db/transaction.go b/app/db/transaction.go index 036240f7..4175ffd8 100644 --- a/app/db/transaction.go +++ b/app/db/transaction.go @@ -17,7 +17,7 @@ import ( func UpsertTransaction(dbWTx *gorm.DB, tx *Transactions) error { if tx == nil { - return errors.New("Empty transaction received while attempting to persist") + return errors.New("empty transaction received while attempting to persist") } return dbWTx.Clauses(clause.OnConflict{UpdateAll: true}).Create(tx).Error diff --git a/app/queue/queue.go b/app/queue/queue.go new file mode 100644 index 00000000..2591f3ba --- /dev/null +++ b/app/queue/queue.go @@ -0,0 +1,621 @@ +package queue + +import ( + "context" + "math" + "time" + + "github.com/itzmeanjan/ette/app/config" +) + +// Block - Keeps track of single block i.e. how many +// times attempted till date, last attempted to process +// whether block data has been published on pubsub topic or not, +// is block processing currently +type Block struct { + UnconfirmedProgress bool // 1. Joins + Published bool // 2. Pub/Sub publishing + UnconfirmedDone bool // 3. Done with processing + ConfirmedProgress bool // 4. Attempting confirm whether chain reorg happened or not + ConfirmedDone bool // 5. Done with bringing latest changes ✅ + LastAttempted time.Time + Delay time.Duration +} + +// SetDelay - Set delay at next fibonacci number in series, interpreted as seconds +func (b *Block) SetDelay() { + b.Delay = time.Duration(int64(math.Round(b.Delay.Seconds()*(1.0+math.Sqrt(5.0))/2))%3600) * time.Second +} + +// ResetDelay - Reset delay back to 1 second +func (b *Block) ResetDelay() { + b.Delay = time.Duration(1) * time.Second +} + +// SetLastAttempted - Updates last attempted to process block +// to current UTC time +func (b *Block) SetLastAttempted() { + b.LastAttempted = time.Now().UTC() +} + +// CanAttempt - Can we attempt to process this block ? +// +// Yes, if waiting phase has elapsed +func (b *Block) CanAttempt() bool { + return time.Now().UTC().After(b.LastAttempted.Add(b.Delay)) +} + +// Request - Any request to be placed into +// queue's channels in this form, so that client +// can also receive response/ confirmation over channel +// that they specify +type Request struct { + BlockNumber uint64 + ResponseChan chan bool +} + +type Update Request + +// Next - Block to be processed next, asked +// by sending this request & when receptor +// detects so, will attempt to find out +// what should be next processed & send that block +// number is response over channel specified by client +type Next struct { + ResponseChan chan struct { + Status bool + Number uint64 + } +} + +// Stat - Clients can query how many blocks present +// in queue currently +type Stat struct { + ResponseChan chan StatResponse +} + +// StatResponse - Statistics of queue to be +// responded back to client in this form +type StatResponse struct { + UnconfirmedProgress uint64 + UnconfirmedWaiting uint64 + ConfirmedProgress uint64 + ConfirmedWaiting uint64 + Total uint64 +} + +// BlockProcessorQueue - To be interacted with before attempting to +// process any block +// +// It's concurrent safe +type BlockProcessorQueue struct { + Blocks map[uint64]*Block + StartedWith uint64 + TotalInserted uint64 + LatestBlock uint64 + Total uint64 + PutChan chan Request + CanPublishChan chan Request + PublishedChan chan Request + InsertedChan chan Request + UnconfirmedFailedChan chan Request + UnconfirmedDoneChan chan Request + ConfirmedFailedChan chan Request + ConfirmedDoneChan chan Request + StatChan chan Stat + LatestChan chan Update + UnconfirmedNextChan chan Next + ConfirmedNextChan chan Next +} + +// New - Getting new instance of queue, to be +// invoked during setting up application +func New(startingWith uint64) *BlockProcessorQueue { + + return &BlockProcessorQueue{ + Blocks: make(map[uint64]*Block), + StartedWith: startingWith, + TotalInserted: 0, + LatestBlock: 0, + Total: 0, + PutChan: make(chan Request, 128), + CanPublishChan: make(chan Request, 128), + PublishedChan: make(chan Request, 128), + InsertedChan: make(chan Request, 128), + UnconfirmedFailedChan: make(chan Request, 128), + UnconfirmedDoneChan: make(chan Request, 128), + ConfirmedFailedChan: make(chan Request, 128), + ConfirmedDoneChan: make(chan Request, 128), + StatChan: make(chan Stat, 1), + LatestChan: make(chan Update, 1), + UnconfirmedNextChan: make(chan Next, 1), + ConfirmedNextChan: make(chan Next, 1), + } + +} + +// Put - Client is supposed to be invoking this method +// when it's interested in putting new block to processing queue +// +// If responded with `true`, they're good to go with execution of +// processing of this block +// +// If this block is already put into queue, it'll ask client +// to not proceed with this number +func (b *BlockProcessorQueue) Put(block uint64) bool { + + resp := make(chan bool) + req := Request{ + BlockNumber: block, + ResponseChan: resp, + } + + b.PutChan <- req + return <-resp + +} + +// CanPublish - Before any client attempts to publish any block +// on Pub/Sub topic, they're supposed to be invoking this method +// to check whether they're eligible of publishing or not +// +// Actually if any other client has already published it, we'll +// better avoid redoing it +func (b *BlockProcessorQueue) CanPublish(block uint64) bool { + + resp := make(chan bool) + req := Request{ + BlockNumber: block, + ResponseChan: resp, + } + + b.CanPublishChan <- req + return <-resp + +} + +// Published - Asks queue manager to mark that this block has been +// successfully published on Pub/Sub topic +// +// Future block processing attempts ( if any ), are supposed to be +// avoiding doing this, if already done successfully +func (b *BlockProcessorQueue) Published(block uint64) bool { + + resp := make(chan bool) + req := Request{ + BlockNumber: block, + ResponseChan: resp, + } + + b.PublishedChan <- req + return <-resp + +} + +// Inserted - Marking this block has been inserted into DB ( not updation, it's insertion ) +func (b *BlockProcessorQueue) Inserted(block uint64) bool { + + resp := make(chan bool) + req := Request{ + BlockNumber: block, + ResponseChan: resp, + } + + b.InsertedChan <- req + return <-resp + +} + +// UnconfirmedFailed - Unconfirmed block processing failed +func (b *BlockProcessorQueue) UnconfirmedFailed(block uint64) bool { + + resp := make(chan bool) + req := Request{ + BlockNumber: block, + ResponseChan: resp, + } + + b.UnconfirmedFailedChan <- req + return <-resp + +} + +// UnconfirmedDone - Unconfirmed block processed successfully +func (b *BlockProcessorQueue) UnconfirmedDone(block uint64) bool { + + resp := make(chan bool) + req := Request{ + BlockNumber: block, + ResponseChan: resp, + } + + b.UnconfirmedDoneChan <- req + return <-resp + +} + +// ConfirmedFailed - Confirmed block processing failed +func (b *BlockProcessorQueue) ConfirmedFailed(block uint64) bool { + + resp := make(chan bool) + req := Request{ + BlockNumber: block, + ResponseChan: resp, + } + + b.ConfirmedFailedChan <- req + return <-resp + +} + +// ConfirmedDone - Confirmed block processed successfully +func (b *BlockProcessorQueue) ConfirmedDone(block uint64) bool { + + resp := make(chan bool) + req := Request{ + BlockNumber: block, + ResponseChan: resp, + } + + b.ConfirmedDoneChan <- req + return <-resp + +} + +// Stat - Client's are supposed to be invoking this abstracted method +// for checking queue status +func (b *BlockProcessorQueue) Stat() StatResponse { + + resp := make(chan StatResponse) + req := Stat{ResponseChan: resp} + + b.StatChan <- req + return <-resp + +} + +// Latest - Block head subscriber will update queue manager +// that latest block seen is updated +func (b *BlockProcessorQueue) Latest(num uint64) bool { + + resp := make(chan bool) + udt := Update{BlockNumber: num, ResponseChan: resp} + + b.LatestChan <- udt + return <-resp + +} + +// UnconfirmedNext - Next block that can be processed, present in unconfirmed block queue +func (b *BlockProcessorQueue) UnconfirmedNext() (uint64, bool) { + + resp := make(chan struct { + Status bool + Number uint64 + }) + req := Next{ResponseChan: resp} + + b.UnconfirmedNextChan <- req + + v := <-resp + return v.Number, v.Status + +} + +// ConfirmedNext - Next block that can be processed, to get confirmation & finalised +func (b *BlockProcessorQueue) ConfirmedNext() (uint64, bool) { + + resp := make(chan struct { + Status bool + Number uint64 + }) + req := Next{ResponseChan: resp} + + b.ConfirmedNextChan <- req + + v := <-resp + return v.Number, v.Status + +} + +// CanBeConfirmed -Checking whether given block number has reached +// finality as per given user set preference, then it can be attempted +// to be checked again & finally entered into storage +func (b *BlockProcessorQueue) CanBeConfirmed(num uint64) bool { + + if b.LatestBlock < config.GetBlockConfirmations() { + return false + } + + return b.LatestBlock-config.GetBlockConfirmations() >= num + +} + +// Start - You're supposed to be starting this method as an +// independent go routine, with will listen on multiple channels +// & respond back over provided channel ( by client ) +func (b *BlockProcessorQueue) Start(ctx context.Context) { + + for { + select { + + case <-ctx.Done(): + return + + case req := <-b.PutChan: + + // Once a block is inserted into processing queue, don't + // overwrite its history with some new request + if _, ok := b.Blocks[req.BlockNumber]; ok { + + req.ResponseChan <- false + break + + } + + b.Blocks[req.BlockNumber] = &Block{ + UnconfirmedProgress: true, + LastAttempted: time.Now().UTC(), + Delay: time.Duration(1) * time.Second, + } + req.ResponseChan <- true + + case req := <-b.CanPublishChan: + + block, ok := b.Blocks[req.BlockNumber] + if !ok { + req.ResponseChan <- false + break + } + + req.ResponseChan <- !block.Published + + case req := <-b.PublishedChan: + // Worker go rountine marks this block has been + // published i.e. doesn't denote it has been processed + // successfully + // + // If not, it'll be marked so & no future attempt + // should try to publish it again over Pub/Sub + + block, ok := b.Blocks[req.BlockNumber] + if !ok { + req.ResponseChan <- false + break + } + + block.Published = true + req.ResponseChan <- true + + case req := <-b.InsertedChan: + // Increments how many blocks were inserted into DB + + if _, ok := b.Blocks[req.BlockNumber]; !ok { + req.ResponseChan <- false + break + } + + b.TotalInserted++ + req.ResponseChan <- true + + case req := <-b.UnconfirmedFailedChan: + + block, ok := b.Blocks[req.BlockNumber] + if !ok { + req.ResponseChan <- false + break + } + + block.UnconfirmedProgress = false + block.SetDelay() + + req.ResponseChan <- true + + case req := <-b.UnconfirmedDoneChan: + + block, ok := b.Blocks[req.BlockNumber] + if !ok { + req.ResponseChan <- false + break + } + + block.UnconfirmedProgress = false + block.UnconfirmedDone = true + + if config.Get("EtteMode") == "1" || config.Get("EtteMode") == "3" { + block.ConfirmedDone = b.CanBeConfirmed(req.BlockNumber) + } else { + block.ConfirmedDone = true // No need to attain this, because we're not putting anything in DB + } + + block.ResetDelay() + block.SetLastAttempted() + + req.ResponseChan <- true + + case req := <-b.ConfirmedFailedChan: + + block, ok := b.Blocks[req.BlockNumber] + if !ok { + req.ResponseChan <- false + break + } + + block.ConfirmedProgress = false + block.SetDelay() + + req.ResponseChan <- true + + case req := <-b.ConfirmedDoneChan: + + block, ok := b.Blocks[req.BlockNumber] + if !ok { + req.ResponseChan <- false + break + } + + block.ConfirmedProgress = false + block.ConfirmedDone = true + + req.ResponseChan <- true + + case nxt := <-b.UnconfirmedNextChan: + + // This is the block number which should be processed + // by requester client, which is attempted to be found + var selected uint64 + // Whether we've found anything or not + var found bool + + for k := range b.Blocks { + + if b.Blocks[k].ConfirmedDone || b.Blocks[k].ConfirmedProgress { + continue + } + + if b.Blocks[k].UnconfirmedDone || b.Blocks[k].UnconfirmedProgress { + continue + } + + if b.Blocks[k].CanAttempt() { + selected = k + found = true + + break + } + + } + + if !found { + + // As we've failed to find any block which can be processed + // now, we're asking client to come back sometime later + // + // When to come back is upto client + nxt.ResponseChan <- struct { + Status bool + Number uint64 + }{ + Status: false, + } + break + + } + + // Updated when last this block was attempted to be processed + b.Blocks[selected].SetLastAttempted() + b.Blocks[selected].UnconfirmedProgress = true + + // Asking client to proceed with processing of this block + nxt.ResponseChan <- struct { + Status bool + Number uint64 + }{ + Status: true, + Number: selected, + } + + case nxt := <-b.ConfirmedNextChan: + + var selected uint64 + var found bool + + for k := range b.Blocks { + + if b.Blocks[k].ConfirmedDone || b.Blocks[k].ConfirmedProgress { + continue + } + + if !b.Blocks[k].UnconfirmedDone { + continue + } + + if b.Blocks[k].CanAttempt() && b.CanBeConfirmed(k) { + selected = k + found = true + + break + } + + } + + if !found { + + nxt.ResponseChan <- struct { + Status bool + Number uint64 + }{ + Status: false, + } + break + + } + + b.Blocks[selected].SetLastAttempted() + b.Blocks[selected].ConfirmedProgress = true + + nxt.ResponseChan <- struct { + Status bool + Number uint64 + }{ + Status: true, + Number: selected, + } + + case req := <-b.StatChan: + + // Returning back how many blocks currently living + // in block processor queue & in what state + var stat StatResponse + + for k := range b.Blocks { + + if b.Blocks[k].UnconfirmedProgress { + stat.UnconfirmedProgress++ + continue + } + + if b.Blocks[k].UnconfirmedProgress == b.Blocks[k].UnconfirmedDone { + stat.UnconfirmedWaiting++ + continue + } + + if b.Blocks[k].ConfirmedProgress { + stat.ConfirmedProgress++ + continue + } + + if b.Blocks[k].ConfirmedProgress == b.Blocks[k].ConfirmedDone { + stat.ConfirmedWaiting++ + continue + } + + } + + stat.Total = b.Total + req.ResponseChan <- stat + + case udt := <-b.LatestChan: + // Latest block number seen by subscriber to + // sent to queue, to be used in when deciding whether some + // block is confirmed/ finalised or not + b.LatestBlock = udt.BlockNumber + udt.ResponseChan <- true + + case <-time.After(time.Duration(100) * time.Millisecond): + + // Finding out which blocks are confirmed & we're good to + // clean those up + for k := range b.Blocks { + + if b.Blocks[k].ConfirmedDone { + delete(b.Blocks, k) + b.Total++ // Successfully processed #-of blocks + } + + } + + } + } + +} diff --git a/app/setup.go b/app/setup.go new file mode 100644 index 00000000..1b0c6cad --- /dev/null +++ b/app/setup.go @@ -0,0 +1,76 @@ +package app + +import ( + "context" + "log" + "sync" + + "github.com/go-redis/redis/v8" + cfg "github.com/itzmeanjan/ette/app/config" + d "github.com/itzmeanjan/ette/app/data" + "github.com/itzmeanjan/ette/app/db" + q "github.com/itzmeanjan/ette/app/queue" + "github.com/itzmeanjan/ette/app/rest/graph" + "gorm.io/gorm" +) + +// Setting ground up i.e. acquiring resources required & determining with +// some basic checks whether we can proceed to next step or not +func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConnection, *redis.Client, *d.RedisInfo, *gorm.DB, *d.StatusHolder, *q.BlockProcessorQueue) { + + err := cfg.Read(configFile) + if err != nil { + log.Fatalf("[!] Failed to read `.env` : %s\n", err.Error()) + } + + if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "2" || cfg.Get("EtteMode") == "3" || cfg.Get("EtteMode") == "4" || cfg.Get("EtteMode") == "5") { + log.Fatalf("[!] Failed to find `EtteMode` in configuration file\n") + } + + // Maintaining both HTTP & Websocket based connection to blockchain + _connection := &d.BlockChainNodeConnection{ + RPC: getClient(true), + Websocket: getClient(false), + } + + _redisClient := getRedisClient() + + if _redisClient == nil { + log.Fatalf("[!] Failed to connect to Redis Server\n") + } + + if err := _redisClient.FlushAll(context.Background()).Err(); err != nil { + log.Printf("[!] Failed to flush all keys from redis : %s\n", err.Error()) + } + + _db := db.Connect() + + // Populating subscription plans from `.plans.json` into + // database table, at application start up + db.PersistAllSubscriptionPlans(_db, subscriptionPlansFile) + + // Passing db handle, to graph package, so that it can be used + // for resolving graphQL queries + graph.GetDatabaseConnection(_db) + + _status := &d.StatusHolder{ + State: &d.SyncState{ + BlockCountAtStartUp: db.GetBlockCount(_db), + MaxBlockNumberAtStartUp: db.GetCurrentBlockNumber(_db), + }, + Mutex: &sync.RWMutex{}, + } + + _redisInfo := &d.RedisInfo{ + Client: _redisClient, + BlockPublishTopic: "block", + TxPublishTopic: "transaction", + EventPublishTopic: "event", + } + + // This is block processor queue + _queue := q.New(db.GetCurrentBlockNumber(_db)) + + return _connection, _redisClient, _redisInfo, _db, _status, _queue + +} diff --git a/app/snapshot/read.go b/app/snapshot/read.go index 1d8c1adc..045ca549 100644 --- a/app/snapshot/read.go +++ b/app/snapshot/read.go @@ -136,7 +136,7 @@ func ProcessBlock(db *gorm.DB, data []byte, control chan bool) { // easily used for persisting whole block data into DB _block := ProtoBufToBlock(block) - if err := _db.StoreBlock(db, _block, nil); err != nil { + if err := _db.StoreBlock(db, _block, nil, nil); err != nil { log.Printf("[!] Failed to restore block : %s\n", err.Error())