diff --git a/README.md b/README.md index 7214cb7e..df89cf13 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,8 @@ And that's `ette` > Note : Redis **v6.0.6** is required +> Note : Setting password in Redis instance has been made optional from now on, though it's recommended. + - Blockchain Node's both **HTTP & Websocket** connection URL required, because we'll be querying block, transaction, event log related data using HTTP interface & listening for block mining events in real time over Websocket. ## Installation 🛠 @@ -98,6 +100,12 @@ cd ette - For processing block(s)/ tx(s) concurrently, it'll create `ConcurrencyFactor * #-of CPUs on machine` workers, who will pick up jobs submitted to them. - If nothing is specified, it defaults to 1 & assuming you're running `ette` on machine with 4 CPUs, it'll spawn worker pool of size 4. But more number of jobs can be submitted, only 4 can be running at max. - 👆 being done for controlling concurrency level, by putting more control on user's hand. + - If you want to persist blocks in delayed fashion, you might consider setting `BlockConfirmations` to some _number > 0_. + - That will make `ette` think you're asking it 80 is latest block, which can be persisted in final data store, when latest mined block number is 100 & `BlockConfirmations` is set to 20. + - This option is **recommended** to be used, at least in production. + - Skipping `RedisPassword` is absolutely fine, if you don't want to use any password in Redis instance. [ **Not recommended** ] + - For range based queries `BlockRange` can be set to limit how many blocks can be queried by client in a single go. Default value 100. + - For time span based queries `TimeRange` can be set to put limit on max time span _( in terms of second )_, can be used by clients. Default value 3600 i.e. 1 hour. ``` RPCUrl=https:// @@ -117,6 +125,9 @@ Production=yes EtteMode=3 EtteGraphQLPlayGround=yes ConcurrencyFactor=2 +BlockConfirmations=20 +BlockRange=1000 +TimeRange=21600 ``` - Create another file in same directory, named `.plans.json`, whose content will look like 👇. diff --git a/app/app.go b/app/app.go index ed0bcfbe..f1954265 100644 --- a/app/app.go +++ b/app/app.go @@ -35,7 +35,11 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne Websocket: getClient(false), } - _redisClient := getPubSubClient() + _redisClient := getRedisClient() + + if _redisClient == nil { + log.Fatalf("[!] Failed to connect to Redis Server\n") + } if err := _redisClient.FlushAll(context.Background()).Err(); err != nil { log.Printf("[!] Failed to flush all keys from redis : %s\n", err.Error()) @@ -63,8 +67,9 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne func Run(configFile, subscriptionPlansFile string) { _connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile) _redisInfo := d.RedisInfo{ - Client: _redisClient, - QueueName: "blocks", + Client: _redisClient, + BlockRetryQueueName: "blocks_in_retry_queue", + UnfinalizedBlocksQueueName: "unfinalized_blocks", } // Attempting to listen to Ctrl+C signal diff --git a/app/block/block.go b/app/block/block.go index 33bdb672..a5c4efd0 100644 --- a/app/block/block.go +++ b/app/block/block.go @@ -1,6 +1,7 @@ package block import ( + "fmt" "log" "runtime" @@ -14,6 +15,14 @@ import ( "gorm.io/gorm" ) +// HasBlockFinalized - Checking whether block under processing i.e. `number` +// has `N` confirmations on top of it or not +func HasBlockFinalized(status *d.StatusHolder, number uint64) bool { + + return status.GetLatestBlockNumber()-cfg.GetBlockConfirmations() >= number + +} + // ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder) { @@ -40,11 +49,24 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // This is what we just published on pubsub channel packedBlock := pubsubWorker(nil) + if !HasBlockFinalized(status, packedBlock.Block.Number) { + + log.Print(color.LightRed.Sprintf("[x] Non-final block %d with 0 tx(s) [ Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis))) + + // Pushing into unfinalized block queue, to be picked up only when + // finality for this block has been achieved + PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number)) + return + + } + // If block doesn't contain any tx, we'll attempt to persist only block if err := db.StoreBlock(_db, packedBlock, status); err != nil { + log.Print(color.Red.Sprintf("[+] Failed to process block %d with 0 tx(s) : %s", block.NumberU64(), err.Error())) + // If failed to persist, we'll put it in retry queue - pushBlockHashIntoRedisQueue(redis, block.Number().String()) + PushBlockIntoRetryQueue(redis, block.Number().String()) return } @@ -127,8 +149,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // we're exiting from this context, while putting this block number in retry queue if !(result.Failure == 0) { - // If failed to persist, we'll put it in retry queue - pushBlockHashIntoRedisQueue(redis, block.Number().String()) + PushBlockIntoRetryQueue(redis, block.Number().String()) return } @@ -138,11 +159,24 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm // This is what we just published on pubsub channel packedBlock := pubsubWorker(packedTxs) + if !HasBlockFinalized(status, packedBlock.Block.Number) { + + log.Print(color.LightRed.Sprintf("[x] Non-final block %d with %d tx(s) [ Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, block.Transactions().Len(), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis))) + + // Pushing into unfinalized block queue, to be picked up only when + // finality for this block has been achieved + PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number)) + return + + } + // If block doesn't contain any tx, we'll attempt to persist only block if err := db.StoreBlock(_db, packedBlock, status); err != nil { + log.Print(color.Red.Sprintf("[+] Failed to process block %d with %d tx(s) : %s", block.NumberU64(), block.Transactions().Len(), err.Error())) + // If failed to persist, we'll put it in retry queue - pushBlockHashIntoRedisQueue(redis, block.Number().String()) + PushBlockIntoRetryQueue(redis, block.Number().String()) return } diff --git a/app/block/fetch.go b/app/block/fetch.go index cd31addf..74cac1b4 100644 --- a/app/block/fetch.go +++ b/app/block/fetch.go @@ -20,7 +20,7 @@ func FetchBlockByHash(client *ethclient.Client, hash common.Hash, number string, block, err := client.BlockByHash(context.Background(), hash) if err != nil { // Pushing block number into Redis queue for retrying later - pushBlockHashIntoRedisQueue(redis, number) + PushBlockIntoRetryQueue(redis, number) log.Print(color.Red.Sprintf("[!] Failed to fetch block by hash [ block : %s] : %s", number, err.Error())) return @@ -37,7 +37,7 @@ func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, r block, err := client.BlockByNumber(context.Background(), _num) if err != nil { // Pushing block number into Redis queue for retrying later - pushBlockHashIntoRedisQueue(redis, fmt.Sprintf("%d", number)) + PushBlockIntoRetryQueue(redis, fmt.Sprintf("%d", number)) log.Print(color.Red.Sprintf("[!] Failed to fetch block by number [ block : %d ] : %s", number, err)) return diff --git a/app/block/listener.go b/app/block/listener.go index 2cdc4990..3b235153 100644 --- a/app/block/listener.go +++ b/app/block/listener.go @@ -48,6 +48,11 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, log.Fatal(color.Red.Sprintf("[!] Listener stopped : %s", err.Error())) break case header := <-headerChan: + + // Latest block number seen, is getting safely updated, as + // soon as new block mined data gets propagated to network + status.SetLatestBlockNumber(header.Number.Uint64()) + if first { // Starting now, to be used for calculating system performance, uptime etc. @@ -60,19 +65,13 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // blocks from highest block number it fetched last time to current network block number // i.e. trying to fill up gap, which was caused when `ette` was offline // - // But in reverse direction i.e. from 100 to 50, where `ette` fetched upto 50 last time & 100 - // is latest block, got mined in network - // - // Yes it's going refetch 50, due to the fact, some portions of 50 might be missed in last try - // So, it'll check & decide whether persisting again is required or not - // - // This backward traversal mechanism gives us more recent blockchain happenings to cover + // Backward traversal mechanism gives us more recent blockchain happenings to cover go SyncBlocksByRange(connection.RPC, _db, redis, header.Number.Uint64()-1, currentHighestBlockNumber, status) // Starting go routine for fetching blocks `ette` failed to process in previous attempt // // Uses Redis backed queue for fetching pending block hash & retries - go retryBlockFetching(connection.RPC, _db, redis, status) + go RetryQueueManager(connection.RPC, _db, redis, status) // Making sure on when next latest block header is received, it'll not // start another syncer @@ -93,6 +92,41 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB, // so that it gets processed immediately func(blockHash common.Hash, blockNumber string) { + // Attempting to submit all blocks to job processor queue + // if more blocks are present in non-final queue, than actually + // should be + for GetUnfinalizedQueueLength(redis) > int64(cfg.GetBlockConfirmations()) { + + // Before submitting new block processing job + // checking whether there exists any block in unfinalized + // block queue or not + // + // If yes, we're attempting to process it, because it has now + // achieved enough confirmations + if CheckIfOldestBlockIsConfirmed(redis, status) { + + oldest := PopOldestBlockFromUnfinalizedQueue(redis) + + log.Print(color.Yellow.Sprintf("[*] Attempting to process finalised block %d [ Latest Block : %d | In Queue : %d ]", oldest, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis))) + + wp.Submit(func() { + + FetchBlockByNumber(connection.RPC, + oldest, + _db, + redis, + status) + + }) + + } else { + // If oldest block is not finalized, no meaning + // staying here, we'll revisit it some time in future + break + } + + } + wp.Submit(func() { FetchBlockByHash(connection.RPC, diff --git a/app/block/publish.go b/app/block/publish.go index cd3ffc01..db7fce67 100644 --- a/app/block/publish.go +++ b/app/block/publish.go @@ -12,6 +12,10 @@ import ( // PublishBlock - Attempts to publish block data to Redis pubsub channel func PublishBlock(block *db.PackedBlock, redis *d.RedisInfo) { + if block == nil { + return + } + if err := redis.Client.Publish(context.Background(), "block", &d.Block{ Hash: block.Block.Hash, Number: block.Block.Number, diff --git a/app/block/retry.go b/app/block/retry.go index 9293c825..f80ec495 100644 --- a/app/block/retry.go +++ b/app/block/retry.go @@ -17,14 +17,15 @@ import ( "gorm.io/gorm" ) -// Pop oldest block number from Redis queue & try to fetch it in different go routine +// RetryQueueManager - Pop oldest block number from Redis backed retry +// queue & try to fetch it in different go routine // // Sleeps for 1000 milliseconds // // Keeps repeating -func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) { +func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) { sleep := func() { - time.Sleep(time.Duration(500) * time.Millisecond) + time.Sleep(time.Duration(1000) * time.Millisecond) } // Creating worker pool and submitting jobs as soon as it's determined @@ -36,7 +37,7 @@ func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.Redi sleep() // Popping oldest element from Redis queue - blockNumber, err := redis.Client.LPop(context.Background(), redis.QueueName).Result() + blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueueName).Result() if err != nil { continue } @@ -47,7 +48,7 @@ func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.Redi continue } - log.Print(color.Cyan.Sprintf("[~] Retrying block : %d [ In Queue : %d ]", parsedBlockNumber, getRetryQueueLength(redis))) + log.Print(color.Cyan.Sprintf("[~] Retrying block : %d [ In Queue : %d ]", parsedBlockNumber, GetRetryQueueLength(redis))) // Submitting block processor job into pool // which will be picked up & processed @@ -67,39 +68,40 @@ func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.Redi } } -// Pushes failed to fetch block number at end of Redis queue +// PushBlockIntoRetryQueue - Pushes failed to fetch block number at end of Redis queue // given it has not already been added -func pushBlockHashIntoRedisQueue(redis *data.RedisInfo, blockNumber string) { +func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) { // Checking presence first & then deciding whether to add it or not - if !checkExistenceOfBlockNumberInRedisQueue(redis, blockNumber) { + if !CheckBlockInRetryQueue(redis, blockNumber) { - if err := redis.Client.RPush(context.Background(), redis.QueueName, blockNumber).Err(); err != nil { - log.Print(color.Red.Sprintf("[!] Failed to push block %s : %s", blockNumber, err.Error())) + if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueueName, blockNumber).Result(); err != nil { + log.Print(color.Red.Sprintf("[!] Failed to push block %s into retry queue : %s", blockNumber, err.Error())) } } } -// Checks whether block number is already added in Redis backed retry queue or not +// CheckBlockInRetryQueue - Checks whether block number is already added in +// Redis backed retry queue or not // // If yes, it'll not be added again // // Note: this feature of checking index of value in redis queue, // was added in Redis v6.0.6 : https://redis.io/commands/lpos -func checkExistenceOfBlockNumberInRedisQueue(redis *data.RedisInfo, blockNumber string) bool { - if _, err := redis.Client.LPos(context.Background(), redis.QueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil { +func CheckBlockInRetryQueue(redis *data.RedisInfo, blockNumber string) bool { + if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil { return false } return true } -// Returns redis backed retry queue length -func getRetryQueueLength(redis *data.RedisInfo) int64 { +// GetRetryQueueLength - Returns redis backed retry queue length +func GetRetryQueueLength(redis *data.RedisInfo) int64 { - blockCount, err := redis.Client.LLen(context.Background(), redis.QueueName).Result() + blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueueName).Result() if err != nil { - log.Printf(color.Red.Sprintf("[!] Failed to determine Redis queue length : %s", err.Error())) + log.Printf(color.Red.Sprintf("[!] Failed to determine retry queue length : %s", err.Error())) } return blockCount diff --git a/app/block/syncer.go b/app/block/syncer.go index 3b983c0e..a898703b 100644 --- a/app/block/syncer.go +++ b/app/block/syncer.go @@ -69,8 +69,20 @@ func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redis *data.Redis // // Job specification is provided in `Job` struct job := func(wp *workerpool.WorkerPool, j *d.Job) { + wp.Submit(func() { + if !HasBlockFinalized(status, j.Block) { + + log.Print(color.LightRed.Sprintf("[x] Non-final block %d [ Latest Block : %d | In Queue : %d ]", j.Block, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis))) + + // Pushing into unfinalized block queue, to be picked up only when + // finality for this block has been achieved + PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", j.Block)) + return + + } + FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status) }) @@ -124,11 +136,23 @@ func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redis *data.R // // Job specification is provided in `Job` struct job := func(wp *workerpool.WorkerPool, j *d.Job) { + wp.Submit(func() { + if !HasBlockFinalized(status, j.Block) { + + log.Print(color.LightRed.Sprintf("[x] Non-final block %d [ Latest Block : %d | In Queue : %d ]", j.Block, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis))) + + // Pushing into unfinalized block queue, to be picked up only when + // finality for this block has been achieved + PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", j.Block)) + return + + } + // Worker fetches block by number from local storage block := db.GetBlock(j.DB, j.Block) - if block == nil && !checkExistenceOfBlockNumberInRedisQueue(redis, fmt.Sprintf("%d", j.Block)) { + if block == nil && !CheckBlockInRetryQueue(redis, fmt.Sprintf("%d", j.Block)) { // If not found, block fetching cycle is run, for this block FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status) } diff --git a/app/block/unfinalized_blocks.go b/app/block/unfinalized_blocks.go new file mode 100644 index 00000000..39994c0f --- /dev/null +++ b/app/block/unfinalized_blocks.go @@ -0,0 +1,101 @@ +package block + +import ( + "context" + "log" + "strconv" + + _redis "github.com/go-redis/redis/v8" + "github.com/gookit/color" + "github.com/itzmeanjan/ette/app/data" +) + +// GetOldestBlockFromUnfinalizedQueue - Attempts to read left most element of Redis backed non-final block queue +// i.e. element at 0th index +func GetOldestBlockFromUnfinalizedQueue(redis *data.RedisInfo) string { + + blockNumber, err := redis.Client.LIndex(context.Background(), redis.UnfinalizedBlocksQueueName, 0).Result() + if err != nil { + return "" + } + + return blockNumber + +} + +// CheckIfOldestBlockIsConfirmed - Given oldest block number present in redis backed unfinalized queue +// checks whether this block has yet reached finality or not +func CheckIfOldestBlockIsConfirmed(redis *data.RedisInfo, status *data.StatusHolder) bool { + + oldest := GetOldestBlockFromUnfinalizedQueue(redis) + if oldest == "" { + return false + } + + parsedOldestBlockNumber, err := strconv.ParseUint(oldest, 10, 64) + if err != nil { + return false + } + + return HasBlockFinalized(status, parsedOldestBlockNumber) + +} + +// PopOldestBlockFromUnfinalizedQueue - Pops oldest block i.e. left most block from non-final block number +// queue, which can be processed now +func PopOldestBlockFromUnfinalizedQueue(redis *data.RedisInfo) uint64 { + + blockNumber, err := redis.Client.LPop(context.Background(), redis.UnfinalizedBlocksQueueName).Result() + if err != nil { + return 0 + } + + parsedBlockNumber, err := strconv.ParseUint(blockNumber, 10, 64) + if err != nil { + return 0 + } + + return parsedBlockNumber + +} + +// PushBlockIntoUnfinalizedQueue - Pushes block number, which has not yet reached finality, into +// Redis backed queue, which will be poped out only when `N` block +// confirmations achieved on top of it +func PushBlockIntoUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) { + // Checking presence first & then deciding whether to add it or not + if !CheckBlockInUnfinalizedQueue(redis, blockNumber) { + + if _, err := redis.Client.RPush(context.Background(), redis.UnfinalizedBlocksQueueName, blockNumber).Result(); err != nil { + log.Print(color.Red.Sprintf("[!] Failed to push block %s into non-final block queue : %s", blockNumber, err.Error())) + } + + } +} + +// CheckBlockInUnfinalizedQueue - Checks whether block number is already added in +// Redis backed unfinalized queue or not +// +// If yes, it'll not be added again +// +// Note: this feature of checking index of value in redis queue, +// was added in Redis v6.0.6 : https://redis.io/commands/lpos +func CheckBlockInUnfinalizedQueue(redis *data.RedisInfo, blockNumber string) bool { + if _, err := redis.Client.LPos(context.Background(), redis.UnfinalizedBlocksQueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil { + return false + } + + return true +} + +// GetUnfinalizedQueueLength - Returns redis backed unfinalized block number queue length +func GetUnfinalizedQueueLength(redis *data.RedisInfo) int64 { + + blockCount, err := redis.Client.LLen(context.Background(), redis.UnfinalizedBlocksQueueName).Result() + if err != nil { + log.Printf(color.Red.Sprintf("[!] Failed to determine non-final block queue length : %s", err.Error())) + } + + return blockCount + +} diff --git a/app/client.go b/app/client.go index 79fc2da6..9f68eb0d 100644 --- a/app/client.go +++ b/app/client.go @@ -1,6 +1,7 @@ package app import ( + "context" "log" "github.com/go-redis/redis/v8" @@ -28,14 +29,39 @@ func getClient(isRPC bool) *ethclient.Client { return client } -// Creates connection to redis server & returns that handle to be used for further communication -func getPubSubClient() *redis.Client { +// Creates connection to Redis server & returns that handle to be used for further communication +func getRedisClient() *redis.Client { - return redis.NewClient(&redis.Options{ - Network: cfg.Get("RedisConnection"), - Addr: cfg.Get("RedisAddress"), - Password: cfg.Get("RedisPassword"), - DB: 0, - }) + var options *redis.Options + + // If password is given in config file + if cfg.Get("RedisPassword") != "" { + + options = &redis.Options{ + Network: cfg.Get("RedisConnection"), + Addr: cfg.Get("RedisAddress"), + Password: cfg.Get("RedisPassword"), + DB: 0, + } + + } else { + // If password is not given, attempting to connect with out it + // + // Though this is not recommended + options = &redis.Options{ + Network: cfg.Get("RedisConnection"), + Addr: cfg.Get("RedisAddress"), + DB: 0, + } + + } + + _redis := redis.NewClient(options) + // Checking whether connection was successful or not + if err := _redis.Ping(context.Background()).Err(); err != nil { + return nil + } + + return _redis } diff --git a/app/config/config.go b/app/config/config.go index 9c05c2f6..270dd782 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -34,9 +34,63 @@ func GetConcurrencyFactor() uint64 { return 1 } - if !(parsedFactor > 0) { - return 1 + return parsedFactor +} + +// GetBlockConfirmations - Number of block confirmations required +// before considering that block to be finalized, and can be persisted +// in a permanent data store +func GetBlockConfirmations() uint64 { + + confirmationCount := Get("BlockConfirmations") + if confirmationCount == "" { + return 0 } - return parsedFactor + parsedConfirmationCount, err := strconv.ParseUint(confirmationCount, 10, 64) + if err != nil { + log.Printf("[!] Failed to parse block confirmations : %s\n", err.Error()) + return 0 + } + + return parsedConfirmationCount + +} + +// GetBlockNumberRange - Returns how many blocks can be queried at a time +// when performing range based queries from client side +func GetBlockNumberRange() uint64 { + + blockRange := Get("BlockRange") + if blockRange == "" { + return 100 + } + + parsedBlockRange, err := strconv.ParseUint(blockRange, 10, 64) + if err != nil { + log.Printf("[!] Failed to parse block range : %s\n", err.Error()) + return 100 + } + + return parsedBlockRange + +} + +// GetTimeRange - Returns what's the max time span that can be used while performing query +// from client side, in terms of second +func GetTimeRange() uint64 { + + timeRange := Get("TimeRange") + if timeRange == "" { + return 3600 + } + + parsedTimeRange, err := strconv.ParseUint(timeRange, 10, 64) + if err != nil { + log.Printf("[!] Failed to parse time range : %s\n", err.Error()) + return 3600 + } + + return parsedTimeRange + } diff --git a/app/data/data.go b/app/data/data.go index b948d1e9..20190f79 100644 --- a/app/data/data.go +++ b/app/data/data.go @@ -21,6 +21,7 @@ type SyncState struct { StartedAt time.Time BlockCountAtStartUp uint64 NewBlocksInserted uint64 + LatestBlockNumber uint64 } // BlockCountInDB - Blocks currently present in database @@ -97,11 +98,34 @@ func (s *StatusHolder) Done() uint64 { } +// GetLatestBlockNumber - Attempting to safely read latest block number seen +func (s *StatusHolder) GetLatestBlockNumber() uint64 { + + s.Mutex.RLock() + defer s.Mutex.RUnlock() + + return s.State.LatestBlockNumber + +} + +// SetLatestBlockNumber - Attempting to safely write latest block number +func (s *StatusHolder) SetLatestBlockNumber(num uint64) { + + s.Mutex.Lock() + defer s.Mutex.Unlock() + + s.State.LatestBlockNumber = num + return + +} + // RedisInfo - Holds redis related information in this struct, to be used // when passing to functions as argument type RedisInfo struct { - Client *redis.Client // using this object `ette` will talk to Redis - QueueName string // retry queue name, for storing block numbers + Client *redis.Client // using this object `ette` will talk to Redis + BlockRetryQueueName string // retry queue name, for storing block numbers + UnfinalizedBlocksQueueName string // stores unfinalized block numbers, processes + // them later after reaching finality ( as set by deployer of `ette` ) } // ResultStatus - Keeps track of how many operations went successful diff --git a/app/db/block.go b/app/db/block.go index fb353df8..f270c3d8 100644 --- a/app/db/block.go +++ b/app/db/block.go @@ -1,6 +1,8 @@ package db import ( + "errors" + d "github.com/itzmeanjan/ette/app/data" "gorm.io/gorm" ) @@ -19,6 +21,10 @@ import ( // i.e. either whole block data is written or nothing is written func StoreBlock(dbWOTx *gorm.DB, block *PackedBlock, status *d.StatusHolder) error { + if block == nil { + return errors.New("Empty block received while attempting to persist") + } + // -- Starting DB transaction return dbWOTx.Transaction(func(dbWTx *gorm.DB) error { @@ -43,7 +49,7 @@ func StoreBlock(dbWOTx *gorm.DB, block *PackedBlock, status *d.StatusHolder) err if block.Transactions == nil { - // During 👆 flow, if we've really inserted any new block into database + // During 👆 flow, if we've really inserted a new block into database, // count will get updated if blockInserted { status.IncrementBlocksInserted() @@ -55,13 +61,13 @@ func StoreBlock(dbWOTx *gorm.DB, block *PackedBlock, status *d.StatusHolder) err for _, t := range block.Transactions { - if err := StoreTransaction(dbWOTx, dbWTx, t.Tx); err != nil { + if err := UpsertTransaction(dbWTx, t.Tx); err != nil { return err } for _, e := range t.Events { - if err := StoreEvent(dbWOTx, dbWTx, e); err != nil { + if err := UpsertEvent(dbWTx, e); err != nil { return err } @@ -69,7 +75,7 @@ func StoreBlock(dbWOTx *gorm.DB, block *PackedBlock, status *d.StatusHolder) err } - // During 👆 flow, if we've really inserted any new block into database + // During 👆 flow, if we've really inserted a new block into database, // count will get updated if blockInserted { status.IncrementBlocksInserted() diff --git a/app/db/event.go b/app/db/event.go index 0fe753c2..605b5ed1 100644 --- a/app/db/event.go +++ b/app/db/event.go @@ -4,54 +4,22 @@ import ( "errors" "gorm.io/gorm" + "gorm.io/gorm/clause" ) -// StoreEvent - Either creating new event entry or updating existing one -// depending upon whether entry exists already or not +// UpsertEvent - It may be the case previously this block was processed +// and event log also got persisted into database, which has been updated +// on chain, due to chain reorganization // -// This may be also case when nothing of 👆 is performed, because already -// we've updated data -// -// When reading from database, query not to be wrapped inside any transaction -// but while making any change, it'll be definitely protected using transaction -// to make whole block persistance operation little bit atomic -func StoreEvent(dbWOTx *gorm.DB, dbWTx *gorm.DB, event *Events) error { +// Now we want to persist latest entry into database, using upsert operation +// i.e. if entry doesn't exist yet, it'll be created, but if it does +// i.e. conflicting primary key found, then, all fields will be updated to latest value +func UpsertEvent(dbWTx *gorm.DB, event *Events) error { if event == nil { return errors.New("Empty event received while attempting to persist") } - persistedEvent := GetEvent(dbWOTx, event.Index, event.BlockHash) - if persistedEvent == nil { - return PutEvent(dbWTx, event) - } - - if !persistedEvent.SimilarTo(event) { - return UpdateEvent(dbWTx, event) - } - - return nil - -} - -// GetEvent - Given event index in block & block hash, returns event which is -// matching from database -func GetEvent(_db *gorm.DB, index uint, blockHash string) *Events { - var event Events - - if err := _db.Where("index = ? and blockhash = ?", index, blockHash).First(&event).Error; err != nil { - return nil - } - - return &event -} - -// PutEvent - Persists event log into database -func PutEvent(tx *gorm.DB, event *Events) error { - return tx.Create(event).Error -} + return dbWTx.Clauses(clause.OnConflict{UpdateAll: true}).Create(event).Error -// UpdateEvent - Updating already existing event data -func UpdateEvent(tx *gorm.DB, event *Events) error { - return tx.Where("index = ? and blockhash = ?", event.Index, event.BlockHash).Updates(event).Error } diff --git a/app/db/model.go b/app/db/model.go index 401147e2..e87f8dd2 100644 --- a/app/db/model.go +++ b/app/db/model.go @@ -1,7 +1,6 @@ package db import ( - "bytes" "encoding/json" "log" "time" @@ -56,17 +55,17 @@ func (b *Blocks) SimilarTo(_b *Blocks) bool { // Transactions - Blockchain transaction holder table model type Transactions struct { Hash string `gorm:"column:hash;type:char(66);primaryKey"` - From string `gorm:"column:from;type:char(42);not null"` - To string `gorm:"column:to;type:char(42)"` - Contract string `gorm:"column:contract;type:char(42)"` + From string `gorm:"column:from;type:char(42);not null;index"` + To string `gorm:"column:to;type:char(42);index"` + Contract string `gorm:"column:contract;type:char(42);index"` Value string `gorm:"column:value;type:varchar"` Data []byte `gorm:"column:data;type:bytea"` Gas uint64 `gorm:"column:gas;type:bigint;not null"` GasPrice string `gorm:"column:gasprice;type:varchar;not null"` Cost string `gorm:"column:cost;type:varchar;not null"` - Nonce uint64 `gorm:"column:nonce;type:bigint;not null"` + Nonce uint64 `gorm:"column:nonce;type:bigint;not null;index"` State uint64 `gorm:"column:state;type:smallint;not null"` - BlockHash string `gorm:"column:blockhash;type:char(66);not null"` + BlockHash string `gorm:"column:blockhash;type:char(66);not null;index"` Events Events `gorm:"foreignKey:txhash"` } @@ -75,43 +74,14 @@ func (Transactions) TableName() string { return "transactions" } -// SimilarTo - Checking equality of two transactions -func (t *Transactions) SimilarTo(_t *Transactions) bool { - if _t.To == "" { - return t.Hash == _t.Hash && - t.From == _t.From && - t.Contract == _t.Contract && - t.Value == _t.Value && - bytes.Compare(t.Data, _t.Data) == 0 && - t.Gas == _t.Gas && - t.GasPrice == _t.GasPrice && - t.Cost == _t.Cost && - t.Nonce == _t.Nonce && - t.State == _t.State && - t.BlockHash == _t.BlockHash - } - - return t.Hash == _t.Hash && - t.From == _t.From && - t.To == _t.To && - t.Value == _t.Value && - bytes.Compare(t.Data, _t.Data) == 0 && - t.Gas == _t.Gas && - t.GasPrice == _t.GasPrice && - t.Cost == _t.Cost && - t.Nonce == _t.Nonce && - t.State == _t.State && - t.BlockHash == _t.BlockHash -} - // Events - Events emitted from smart contracts to be held in this table type Events struct { - Origin string `gorm:"column:origin;type:char(42);not null"` + BlockHash string `gorm:"column:blockhash;type:char(66);not null;primaryKey"` Index uint `gorm:"column:index;type:integer;not null;primaryKey"` + Origin string `gorm:"column:origin;type:char(42);not null;index"` Topics pq.StringArray `gorm:"column:topics;type:text[];not null"` Data []byte `gorm:"column:data;type:bytea"` - TransactionHash string `gorm:"column:txhash;type:char(66);not null"` - BlockHash string `gorm:"column:blockhash;type:char(66);not null;primaryKey"` + TransactionHash string `gorm:"column:txhash;type:char(66);not null;index"` } // TableName - Overriding default table name @@ -119,34 +89,6 @@ func (Events) TableName() string { return "events" } -// SimilarTo - Checking equality of two events -func (e *Events) SimilarTo(_e *Events) bool { - - // Given two string arrays, it'll match it's elements by index & if all of them are same - // returns boolean result - compareStringArrays := func(arrayOne pq.StringArray, arrayTwo pq.StringArray) bool { - matched := true - - for k, v := range arrayOne { - - if v != arrayTwo[k] { - matched = false - break - } - - } - - return matched - } - - return e.Origin == _e.Origin && - e.Index == _e.Index && - compareStringArrays(e.Topics, _e.Topics) && - bytes.Compare(e.Data, _e.Data) == 0 && - e.TransactionHash == _e.TransactionHash && - e.BlockHash == _e.BlockHash -} - // PackedTransaction - All data that is stored in a tx, to be passed from // tx data fetcher to whole block data persist handler function type PackedTransaction struct { @@ -194,7 +136,7 @@ func (u *Users) ToJSON() []byte { type DeliveryHistory struct { ID uint64 `gorm:"column:id;type:bigserial;primaryKey"` Client string `gorm:"column:client;type:char(42);not null;index"` - TimeStamp time.Time `gorm:"column:ts;type:timestamp;not null"` + TimeStamp time.Time `gorm:"column:ts;type:timestamp;not null;index:,sort:asc"` EndPoint string `gorm:"column:endpoint;type:varchar(100);not null"` DataLength uint64 `gorm:"column:datalength;type:bigint;not null"` } @@ -223,7 +165,7 @@ func (SubscriptionPlans) TableName() string { // and `address` refers to address in users table type SubscriptionDetails struct { Address string `gorm:"column:address;type:char(42);primaryKey" json:"address"` - SubscriptionPlan uint32 `gorm:"column:subscriptionplan;type:int;not null" json:"subscriptionPlan"` + SubscriptionPlan uint32 `gorm:"column:subscriptionplan;type:int;not null;index" json:"subscriptionPlan"` } // TableName - Overriding default table name diff --git a/app/db/subscription.go b/app/db/subscription.go index 80f3651e..9a8c6301 100644 --- a/app/db/subscription.go +++ b/app/db/subscription.go @@ -147,6 +147,19 @@ func CheckSubscriptionPlanDetailsByAddress(_db *gorm.DB, address common.Address) return &plan } +// GetAllowedDeliveryCountByAddress - Returns how many deliveries can be made to user +// in 24 hours, as per plan they're subscribed to +func GetAllowedDeliveryCountByAddress(_db *gorm.DB, address common.Address) uint64 { + + plan := CheckSubscriptionPlanDetailsByAddress(_db, address) + if plan == nil { + return 0 + } + + return plan.DeliveryCount + +} + // IsValidSubscriptionPlan - Given subscription plan id, checking against // database whether it's a valid one or not func IsValidSubscriptionPlan(_db *gorm.DB, id uint32) bool { diff --git a/app/db/transaction.go b/app/db/transaction.go index 08cecc94..253a9361 100644 --- a/app/db/transaction.go +++ b/app/db/transaction.go @@ -4,50 +4,22 @@ import ( "errors" "gorm.io/gorm" + "gorm.io/gorm/clause" ) -// StoreTransaction - Stores specific tx into database, where presence of tx -// in database being performed using db handle ( i.e. `dbWOTx` ) which is not -// wrapped inside database transaction, but whenever performing any writing to database -// uses db handle which is always passed to db engine, wrapped inside a db transaction -// i.e. `dbWTx`, which protects helps us in rolling back to previous state, in case of some failure -// faced during this persistance stage -func StoreTransaction(dbWOTx *gorm.DB, dbWTx *gorm.DB, tx *Transactions) error { +// UpsertTransaction - It may be the case previously this block was processed +// and transaction also got persisted into database, which has been updated +// on chain, due to chain reorganization +// +// Now we want to persist latest entry into database, using upsert operation +// i.e. if entry doesn't exist yet, it'll be created, but if it does +// i.e. conflicting primary key found, then, all fields will be updated to latest value +func UpsertTransaction(dbWTx *gorm.DB, tx *Transactions) error { if tx == nil { return errors.New("Empty transaction received while attempting to persist") } - persistedTx := GetTransaction(dbWOTx, tx.BlockHash, tx.BlockHash) - if persistedTx == nil { - return PutTransaction(dbWTx, tx) - } - - if !persistedTx.SimilarTo(tx) { - return UpdateTransaction(dbWTx, tx) - } - - return nil - -} - -// GetTransaction - Fetches tx entry from database, given txhash & containing block hash -func GetTransaction(_db *gorm.DB, blkHash string, txHash string) *Transactions { - var tx Transactions - - if err := _db.Where("hash = ? and blockhash = ?", txHash, blkHash).First(&tx).Error; err != nil { - return nil - } - - return &tx -} - -// PutTransaction - Persisting transaction -func PutTransaction(tx *gorm.DB, txn *Transactions) error { - return tx.Create(txn).Error -} + return dbWTx.Clauses(clause.OnConflict{UpdateAll: true}).Create(tx).Error -// UpdateTransaction - Updating already persisted transaction -func UpdateTransaction(tx *gorm.DB, txn *Transactions) error { - return tx.Where("hash = ? and blockhash = ?", txn.Hash, txn.BlockHash).Updates(txn).Error } diff --git a/app/db/user.go b/app/db/user.go index 9aff1a8c..0ad641aa 100644 --- a/app/db/user.go +++ b/app/db/user.go @@ -121,6 +121,6 @@ func IsUnderRateLimit(_db *gorm.DB, userAddress string) bool { return false } - // 50k times data delivered to client in a day - return count < 50000 + // Compare it with allowed rate count per 24 hours, of plan user is subscribed to + return count < int64(GetAllowedDeliveryCountByAddress(_db, common.HexToAddress(userAddress))) } diff --git a/app/rest/graph/schema.resolvers.go b/app/rest/graph/schema.resolvers.go index d7325b68..449b953f 100644 --- a/app/rest/graph/schema.resolvers.go +++ b/app/rest/graph/schema.resolvers.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/ethereum/go-ethereum/common" + cfg "github.com/itzmeanjan/ette/app/config" _db "github.com/itzmeanjan/ette/app/db" "github.com/itzmeanjan/ette/app/rest/graph/generated" "github.com/itzmeanjan/ette/app/rest/graph/model" @@ -33,7 +34,7 @@ func (r *queryResolver) BlockByNumber(ctx context.Context, number string) (*mode } func (r *queryResolver) BlocksByNumberRange(ctx context.Context, from string, to string) ([]*model.Block, error) { - _from, _to, err := rangeChecker(from, to, 10) + _from, _to, err := rangeChecker(from, to, cfg.GetBlockNumberRange()) if err != nil { return nil, errors.New("Bad Block Number Range") } @@ -42,7 +43,7 @@ func (r *queryResolver) BlocksByNumberRange(ctx context.Context, from string, to } func (r *queryResolver) BlocksByTimeRange(ctx context.Context, from string, to string) ([]*model.Block, error) { - _from, _to, err := rangeChecker(from, to, 60) + _from, _to, err := rangeChecker(from, to, cfg.GetTimeRange()) if err != nil { return nil, errors.New("Bad Block Timestamp Range") } @@ -80,7 +81,7 @@ func (r *queryResolver) TransactionsFromAccountByNumberRange(ctx context.Context return nil, errors.New("Bad Account Address") } - _from, _to, err := rangeChecker(from, to, 100) + _from, _to, err := rangeChecker(from, to, cfg.GetBlockNumberRange()) if err != nil { return nil, errors.New("Bad Block Number Range") } @@ -93,7 +94,7 @@ func (r *queryResolver) TransactionsFromAccountByTimeRange(ctx context.Context, return nil, errors.New("Bad Account Address") } - _from, _to, err := rangeChecker(from, to, 600) + _from, _to, err := rangeChecker(from, to, cfg.GetTimeRange()) if err != nil { return nil, errors.New("Bad Block Timestamp Range") } @@ -106,7 +107,7 @@ func (r *queryResolver) TransactionsToAccountByNumberRange(ctx context.Context, return nil, errors.New("Bad Account Address") } - _from, _to, err := rangeChecker(from, to, 100) + _from, _to, err := rangeChecker(from, to, cfg.GetBlockNumberRange()) if err != nil { return nil, errors.New("Bad Block Number Range") } @@ -119,7 +120,7 @@ func (r *queryResolver) TransactionsToAccountByTimeRange(ctx context.Context, ac return nil, errors.New("Bad Account Address") } - _from, _to, err := rangeChecker(from, to, 600) + _from, _to, err := rangeChecker(from, to, cfg.GetTimeRange()) if err != nil { return nil, errors.New("Bad Block Timestamp Range") } @@ -136,7 +137,7 @@ func (r *queryResolver) TransactionsBetweenAccountsByNumberRange(ctx context.Con return nil, errors.New("Bad To Account Address") } - _from, _to, err := rangeChecker(from, to, 100) + _from, _to, err := rangeChecker(from, to, cfg.GetBlockNumberRange()) if err != nil { return nil, errors.New("Bad Block Number Range") } @@ -153,7 +154,7 @@ func (r *queryResolver) TransactionsBetweenAccountsByTimeRange(ctx context.Conte return nil, errors.New("Bad To Account Address") } - _from, _to, err := rangeChecker(from, to, 600) + _from, _to, err := rangeChecker(from, to, cfg.GetTimeRange()) if err != nil { return nil, errors.New("Bad Block Timestamp Range") } @@ -166,7 +167,7 @@ func (r *queryResolver) ContractsCreatedFromAccountByNumberRange(ctx context.Con return nil, errors.New("Bad Account Address") } - _from, _to, err := rangeChecker(from, to, 100) + _from, _to, err := rangeChecker(from, to, cfg.GetBlockNumberRange()) if err != nil { return nil, errors.New("Bad Block Number Range") } @@ -179,7 +180,7 @@ func (r *queryResolver) ContractsCreatedFromAccountByTimeRange(ctx context.Conte return nil, errors.New("Bad Account Address") } - _from, _to, err := rangeChecker(from, to, 600) + _from, _to, err := rangeChecker(from, to, cfg.GetTimeRange()) if err != nil { return nil, errors.New("Bad Block Timestamp Range") } @@ -205,7 +206,7 @@ func (r *queryResolver) EventsFromContractByNumberRange(ctx context.Context, con return nil, errors.New("Bad Contract Address") } - _from, _to, err := rangeChecker(from, to, 10) + _from, _to, err := rangeChecker(from, to, cfg.GetBlockNumberRange()) if err != nil { return nil, errors.New("Bad Block Number Range") } @@ -218,7 +219,7 @@ func (r *queryResolver) EventsFromContractByTimeRange(ctx context.Context, contr return nil, errors.New("Bad Contract Address") } - _from, _to, err := rangeChecker(from, to, 60) + _from, _to, err := rangeChecker(from, to, cfg.GetTimeRange()) if err != nil { return nil, errors.New("Bad Block Timestamp Range") } @@ -247,7 +248,7 @@ func (r *queryResolver) EventsFromContractWithTopicsByNumberRange(ctx context.Co return nil, errors.New("Bad Contract Address") } - _from, _to, err := rangeChecker(from, to, 10) + _from, _to, err := rangeChecker(from, to, cfg.GetBlockNumberRange()) if err != nil { return nil, errors.New("Bad Block Number Range") } @@ -260,7 +261,7 @@ func (r *queryResolver) EventsFromContractWithTopicsByTimeRange(ctx context.Cont return nil, errors.New("Bad Contract Address") } - _from, _to, err := rangeChecker(from, to, 60) + _from, _to, err := rangeChecker(from, to, cfg.GetTimeRange()) if err != nil { return nil, errors.New("Bad Block Timestamp Range") } diff --git a/app/rest/rest.go b/app/rest/rest.go index 6163bcf5..89ae666c 100644 --- a/app/rest/rest.go +++ b/app/rest/rest.go @@ -584,7 +584,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl if fromBlock != "" && toBlock != "" { - _from, _to, err := rangeChecker(fromBlock, toBlock, 10) + _from, _to, err := rangeChecker(fromBlock, toBlock, cfg.GetBlockNumberRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block number range", @@ -610,7 +610,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl if fromTime != "" && toTime != "" { - _from, _to, err := rangeChecker(fromTime, toTime, 60) + _from, _to, err := rangeChecker(fromTime, toTime, cfg.GetTimeRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block time range", @@ -702,7 +702,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // ( i.e. deployer ) with in given block number range if fromBlock != "" && toBlock != "" && strings.HasPrefix(deployer, "0x") && len(deployer) == 42 { - _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, 100) + _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, cfg.GetBlockNumberRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block number range", @@ -726,7 +726,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // ( i.e. deployer ) with in given time frame if fromTime != "" && toTime != "" && strings.HasPrefix(deployer, "0x") && len(deployer) == 42 { - _fromTime, _toTime, err := rangeChecker(fromTime, toTime, 600) + _fromTime, _toTime, err := rangeChecker(fromTime, toTime, cfg.GetTimeRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block time range", @@ -750,7 +750,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // between that pair, where `from` & `to` fields are fixed if fromBlock != "" && toBlock != "" && strings.HasPrefix(fromAccount, "0x") && len(fromAccount) == 42 && strings.HasPrefix(toAccount, "0x") && len(toAccount) == 42 { - _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, 100) + _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, cfg.GetBlockNumberRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block number range", @@ -774,7 +774,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // between that pair, where `from` & `to` fields are fixed if fromTime != "" && toTime != "" && strings.HasPrefix(fromAccount, "0x") && len(fromAccount) == 42 && strings.HasPrefix(toAccount, "0x") && len(toAccount) == 42 { - _fromTime, _toTime, err := rangeChecker(fromTime, toTime, 600) + _fromTime, _toTime, err := rangeChecker(fromTime, toTime, cfg.GetTimeRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block time range", @@ -798,7 +798,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // from account if fromBlock != "" && toBlock != "" && strings.HasPrefix(fromAccount, "0x") && len(fromAccount) == 42 { - _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, 100) + _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, cfg.GetBlockNumberRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block number range", @@ -822,7 +822,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // from this account in that given time span if fromTime != "" && toTime != "" && strings.HasPrefix(fromAccount, "0x") && len(fromAccount) == 42 { - _fromTime, _toTime, err := rangeChecker(fromTime, toTime, 600) + _fromTime, _toTime, err := rangeChecker(fromTime, toTime, cfg.GetTimeRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block time range", @@ -846,7 +846,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // to this account in that block range if fromBlock != "" && toBlock != "" && strings.HasPrefix(toAccount, "0x") && len(toAccount) == 42 { - _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, 100) + _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, cfg.GetBlockNumberRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block number range", @@ -870,7 +870,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // to this account in that given time span if fromTime != "" && toTime != "" && strings.HasPrefix(toAccount, "0x") && len(toAccount) == 42 { - _fromTime, _toTime, err := rangeChecker(fromBlock, toBlock, 600) + _fromTime, _toTime, err := rangeChecker(fromBlock, toBlock, cfg.GetTimeRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block time range", @@ -981,7 +981,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // events satisfying criteria if fromBlock != "" && toBlock != "" && strings.HasPrefix(contract, "0x") && len(contract) == 42 && ((strings.HasPrefix(topic0, "0x") && len(topic0) == 66) || (strings.HasPrefix(topic1, "0x") && len(topic1) == 66) || (strings.HasPrefix(topic2, "0x") && len(topic2) == 66) || (strings.HasPrefix(topic3, "0x") && len(topic3) == 66)) { - _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, 10) + _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, cfg.GetBlockNumberRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block number range", @@ -1005,7 +1005,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // events satisfying criteria if fromTime != "" && toTime != "" && strings.HasPrefix(contract, "0x") && len(contract) == 42 && ((strings.HasPrefix(topic0, "0x") && len(topic0) == 66) || (strings.HasPrefix(topic1, "0x") && len(topic1) == 66) || (strings.HasPrefix(topic2, "0x") && len(topic2) == 66) || (strings.HasPrefix(topic3, "0x") && len(topic3) == 66)) { - _fromTime, _toTime, err := rangeChecker(fromTime, toTime, 600) + _fromTime, _toTime, err := rangeChecker(fromTime, toTime, cfg.GetTimeRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block time range", @@ -1028,7 +1028,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // Given block number range & contract address, finds out all events emitted by this contract if fromBlock != "" && toBlock != "" && strings.HasPrefix(contract, "0x") && len(contract) == 42 { - _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, 10) + _fromBlock, _toBlock, err := rangeChecker(fromBlock, toBlock, cfg.GetBlockNumberRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block number range", @@ -1052,7 +1052,7 @@ func RunHTTPServer(_db *gorm.DB, _status *d.StatusHolder, _redisClient *redis.Cl // events emitted by this contract during time span if fromTime != "" && toTime != "" && strings.HasPrefix(contract, "0x") && len(contract) == 42 { - _fromTime, _toTime, err := rangeChecker(fromTime, toTime, 600) + _fromTime, _toTime, err := rangeChecker(fromTime, toTime, cfg.GetTimeRange()) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "msg": "Bad block time range", diff --git a/db/schema.sql b/db/schema.sql index 2ddb1052..c27afe1b 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -36,6 +36,12 @@ create table transactions ( foreign key (blockhash) references blocks(hash) ); +create index on transactions(from); +create index on transactions(to); +create index on transactions(contract); +create index on transactions(nonce); +create index on transactions(blockhash); + create table events ( origin char(42) not null, index integer not null, @@ -48,6 +54,9 @@ create table events ( foreign key (blockhash) references blocks(hash) ); +create index on events(origin); +create index on events(txhash); + create table users ( address char(42) not null, apikey char(66) primary key, @@ -66,6 +75,7 @@ create table delivery_history ( ); create index on delivery_history(client); +create index on delivery_history(ts asc); create table subscription_plans ( id serial primary key, @@ -78,3 +88,5 @@ create table subscription_details ( subscriptionplan int not null, foreign key (subscriptionplan) references subscription_plans(id) ); + +create index on subscription_details(subscriptionplan);