From b98af0196fcfa090f5fd5fc0bda9ced29ce6372f Mon Sep 17 00:00:00 2001 From: Kamil Mukhametzyanov <grkamil@yandex.ru> Date: Wed, 21 Oct 2020 14:04:00 +0300 Subject: [PATCH] refactor ws & fix kicked stakes --- .env.prod | 24 ++++++++++----------- api/v2/addresses/handler.go | 6 +++--- balance/service.go | 4 ++++ cmd/explorer.go | 18 +++++++--------- core/enviroment.go | 4 ++-- core/main.go | 10 ++++----- core/ws/blocks.go | 39 ++++++++++++++++++++++++++++++++++ core/{extender.go => ws/ws.go} | 5 +++-- stake/repository.go | 8 ++++++- tools/cache/cache.go | 10 +-------- tools/metrics/block.go | 10 +++------ 11 files changed, 87 insertions(+), 51 deletions(-) create mode 100644 core/ws/blocks.go rename core/{extender.go => ws/ws.go} (86%) diff --git a/.env.prod b/.env.prod index b96f52c8..0cdc8817 100644 --- a/.env.prod +++ b/.env.prod @@ -1,12 +1,12 @@ -EXPLORER_DEBUG=ME_DEBUG -APP_BASE_COIN=ME_BASE_COIN -CENTRIFUGO_LINK=ME_EXTENDER_WS_ADDRESS -CENTRIFUGO_BLOCK_CHANNEL=ME_EXTENDER_WS_CHANNEL_BLOCKS -DB_HOST=ME_DB_HOST -DB_PORT=ME_DB_PORT -DB_POOL_SIZE=ME_DB_POOL_SIZE -DB_NAME=ME_DB_NAME -DB_USER=ME_DB_USER -DB_PASSWORD=ME_DB_PASSWORD -EXPLORER_PORT=ME_SRV_PORT -MARKET_HOST=ME_MARKET_HOST \ No newline at end of file +EXPLORER_DEBUG= +APP_BASE_COIN= +CENTRIFUGO_LINK= +CENTRIFUGO_BLOCK_CHANNEL= +DB_HOST= +DB_PORT= +DB_POOL_SIZE= +DB_NAME= +DB_USER= +DB_PASSWORD= +EXPLORER_PORT= +MARKET_HOST= \ No newline at end of file diff --git a/api/v2/addresses/handler.go b/api/v2/addresses/handler.go index 667d266d..8b5d9f1a 100644 --- a/api/v2/addresses/handler.go +++ b/api/v2/addresses/handler.go @@ -78,7 +78,7 @@ func GetAddresses(c *gin.Context) { addresses := explorer.AddressRepository.GetByAddresses(minterAddresses) for k, addr := range addresses { - addresses[k] = extendModelWithBaseSymbolBalance(addr, addr.Address, explorer.Environment.BaseCoin) + addresses[k] = extendModelWithBaseSymbolBalance(addr, addr.Address, explorer.Environment.Basecoin) } // extend the model array with empty model if not exists @@ -88,7 +88,7 @@ func GetAddresses(c *gin.Context) { continue } - addresses = append(addresses, makeEmptyAddressModel(item, explorer.Environment.BaseCoin)) + addresses = append(addresses, makeEmptyAddressModel(item, explorer.Environment.Basecoin)) } } @@ -117,7 +117,7 @@ func GetAddress(c *gin.Context) { // fetch address model := explorer.AddressRepository.GetByAddress(*minterAddress) - model = extendModelWithBaseSymbolBalance(model, *minterAddress, explorer.Environment.BaseCoin) + model = extendModelWithBaseSymbolBalance(model, *minterAddress, explorer.Environment.Basecoin) // calculate overall address balance in base coin and fiat if request.WithSum { diff --git a/balance/service.go b/balance/service.go index df088a58..d4f6dc80 100644 --- a/balance/service.go +++ b/balance/service.go @@ -42,6 +42,10 @@ func (s *Service) GetStakeBalance(stakes []models.Stake) *big.Float { sum := big.NewInt(0) for _, stake := range stakes { + if stake.IsKicked { + continue + } + // just add base coin to sum if stake.Coin.Symbol == s.baseCoin { sum = sum.Add(sum, helpers.StringToBigInt(stake.Value)) diff --git a/cmd/explorer.go b/cmd/explorer.go index 83926f08..3b76eae8 100644 --- a/cmd/explorer.go +++ b/cmd/explorer.go @@ -3,6 +3,7 @@ package main import ( "github.com/MinterTeam/minter-explorer-api/v2/api" "github.com/MinterTeam/minter-explorer-api/v2/core" + "github.com/MinterTeam/minter-explorer-api/v2/core/ws" "github.com/MinterTeam/minter-explorer-api/v2/database" "github.com/MinterTeam/minter-explorer-api/v2/tools/metrics" "github.com/joho/godotenv" @@ -29,22 +30,19 @@ func main() { go explorer.MarketService.Run() // create ws extender - extender := core.NewExtenderWsClient(explorer) + extender := ws.NewExtenderWsClient(explorer) defer extender.Close() + // create ws channel handler + blocksChannelHandler := ws.NewBlocksChannelHandler() + blocksChannelHandler.AddSubscriber(explorer.Cache) + blocksChannelHandler.AddSubscriber(metrics.NewLastBlockMetric()) + // subscribe to channel and add cache handler sub := extender.CreateSubscription(explorer.Environment.WsBlocksChannel) - sub.OnPublish(explorer.Cache) + sub.OnPublish(blocksChannelHandler) extender.Subscribe(sub) - // TODO: refactor - // create ws extender for metrics - extender2 := core.NewExtenderWsClient(explorer) - defer extender2.Close() - metricSub := extender2.CreateSubscription(explorer.Environment.WsBlocksChannel) - metricSub.OnPublish(metrics.NewLastBlockMetric()) - extender2.Subscribe(metricSub) - // run api api.Run(db, explorer) } diff --git a/core/enviroment.go b/core/enviroment.go index 0b98ca22..d0a89e65 100644 --- a/core/enviroment.go +++ b/core/enviroment.go @@ -13,7 +13,7 @@ type Environment struct { DbPoolSize int DbHost string DbPort string - BaseCoin string + Basecoin string ServerPort string IsDebug bool WsServer string @@ -34,7 +34,7 @@ func NewEnvironment() *Environment { DbPort: os.Getenv("DB_PORT"), DbPoolSize: int(dbPoolSize), DbHost: os.Getenv("DB_HOST"), - BaseCoin: os.Getenv("APP_BASE_COIN"), + Basecoin: os.Getenv("APP_BASE_COIN"), ServerPort: os.Getenv("EXPLORER_PORT"), IsDebug: os.Getenv("EXPLORER_DEBUG") == "1", WsServer: os.Getenv("CENTRIFUGO_LINK"), diff --git a/core/main.go b/core/main.go index e90523b8..3ccdb5d6 100644 --- a/core/main.go +++ b/core/main.go @@ -40,17 +40,17 @@ type Explorer struct { } func NewExplorer(db *pg.DB, env *Environment) *Explorer { - marketService := market.NewService(coingecko.NewService(env.MarketHost), env.BaseCoin) + marketService := market.NewService(coingecko.NewService(env.MarketHost), env.Basecoin) blockRepository := *blocks.NewRepository(db) validatorRepository := validator.NewRepository(db) stakeRepository := stake.NewRepository(db) cacheService := cache.NewCache(blockRepository.GetLastBlock()) - coinRepository := *coins.NewRepository(db) - transactionService := transaction.NewService(&coinRepository) + coinRepository := coins.NewRepository(db) + transactionService := transaction.NewService(coinRepository) return &Explorer{ BlockRepository: blockRepository, - CoinRepository: coinRepository, + CoinRepository: *coinRepository, AddressRepository: *address.NewRepository(db), TransactionRepository: *transaction.NewRepository(db), InvalidTransactionRepository: *invalid_transaction.NewRepository(db), @@ -62,7 +62,7 @@ func NewExplorer(db *pg.DB, env *Environment) *Explorer { Cache: cacheService, MarketService: marketService, TransactionService: transactionService, - BalanceService: balance.NewService(env.BaseCoin, marketService), + BalanceService: balance.NewService(env.Basecoin, marketService), ValidatorService: services.NewValidatorService(validatorRepository, stakeRepository, cacheService), UnbondRepository: unbond.NewRepository(db), StakeService: stake.NewService(stakeRepository), diff --git a/core/ws/blocks.go b/core/ws/blocks.go new file mode 100644 index 00000000..ead0cf95 --- /dev/null +++ b/core/ws/blocks.go @@ -0,0 +1,39 @@ +package ws + +import ( + "encoding/json" + "github.com/MinterTeam/minter-explorer-api/v2/blocks" + "github.com/MinterTeam/minter-explorer-api/v2/helpers" + "github.com/centrifugal/centrifuge-go" +) + +type BlocksChannelHandler struct { + Subscribers []NewBlockSubscriber +} + +type NewBlockSubscriber interface { + OnNewBlock(blocks.Resource) +} + +// Constructor for blocks event channel handler +func NewBlocksChannelHandler() *BlocksChannelHandler { + return &BlocksChannelHandler{ + Subscribers: make([]NewBlockSubscriber, 0), + } +} + +// Add new subscriber for channel +func (b *BlocksChannelHandler) AddSubscriber(sub NewBlockSubscriber) { + b.Subscribers = append(b.Subscribers, sub) +} + +// Handle new block from ws and publish him to all subscribers +func (b *BlocksChannelHandler) OnPublish(sub *centrifuge.Subscription, e centrifuge.PublishEvent) { + var block blocks.Resource + err := json.Unmarshal(e.Data, &block) + helpers.CheckErr(err) + + for _, sub := range b.Subscribers { + go sub.OnNewBlock(block) + } +} diff --git a/core/extender.go b/core/ws/ws.go similarity index 86% rename from core/extender.go rename to core/ws/ws.go index 35171678..d02cf60e 100644 --- a/core/extender.go +++ b/core/ws/ws.go @@ -1,6 +1,7 @@ -package core +package ws import ( + "github.com/MinterTeam/minter-explorer-api/v2/core" "github.com/MinterTeam/minter-explorer-api/v2/helpers" "github.com/centrifugal/centrifuge-go" ) @@ -10,7 +11,7 @@ type ExtenderWsClient struct { } // create new extender connection -func NewExtenderWsClient(explorer *Explorer) *ExtenderWsClient { +func NewExtenderWsClient(explorer *core.Explorer) *ExtenderWsClient { c := centrifuge.New(explorer.Environment.WsServer, centrifuge.DefaultConfig()) err := c.Connect() diff --git a/stake/repository.go b/stake/repository.go index adff06be..2fe3fc6e 100644 --- a/stake/repository.go +++ b/stake/repository.go @@ -32,7 +32,12 @@ func (repository Repository) GetAllByAddress(address string) ([]models.Stake, er // Get total delegated bip value func (repository Repository) GetSumInBipValue() (string, error) { var sum string - err := repository.db.Model(&models.Stake{}).ColumnExpr("SUM(bip_value)").Select(&sum) + + err := repository.db.Model(&models.Stake{}). + Where("is_kicked = false"). + ColumnExpr("SUM(bip_value)"). + Select(&sum) + return sum, err } @@ -42,6 +47,7 @@ func (repository Repository) GetSumInBipValueByAddress(address string) (string, err := repository.db.Model(&models.Stake{}). Column("OwnerAddress._"). ColumnExpr("SUM(bip_value)"). + Where("is_kicked = false"). Where("owner_address.address = ?", address). Select(&sum) diff --git a/tools/cache/cache.go b/tools/cache/cache.go index 008ea7b1..8955e380 100644 --- a/tools/cache/cache.go +++ b/tools/cache/cache.go @@ -1,11 +1,8 @@ package cache import ( - "encoding/json" "github.com/MinterTeam/minter-explorer-api/v2/blocks" - "github.com/MinterTeam/minter-explorer-api/v2/helpers" "github.com/MinterTeam/minter-explorer-extender/v2/models" - "github.com/centrifugal/centrifuge-go" "sync" "time" ) @@ -83,11 +80,6 @@ func (c *ExplorerCache) GetLastBlock() blocks.Resource { } // update last block id by ws data -func (c *ExplorerCache) OnPublish(sub *centrifuge.Subscription, e centrifuge.PublishEvent) { - var block blocks.Resource - err := json.Unmarshal(e.Data, &block) - helpers.CheckErr(err) - - // update last block id +func (c *ExplorerCache) OnNewBlock(block blocks.Resource) { c.SetLastBlock(block) } diff --git a/tools/metrics/block.go b/tools/metrics/block.go index ea7ac6ce..3a7a98bf 100644 --- a/tools/metrics/block.go +++ b/tools/metrics/block.go @@ -1,10 +1,8 @@ package metrics import ( - "encoding/json" "github.com/MinterTeam/minter-explorer-api/v2/blocks" "github.com/MinterTeam/minter-explorer-api/v2/helpers" - "github.com/centrifugal/centrifuge-go" "github.com/prometheus/client_golang/prometheus" "time" ) @@ -14,6 +12,7 @@ type LastBlockMetric struct { time prometheus.Gauge } +// Constructor for prometheus metrics func NewLastBlockMetric() *LastBlockMetric { prometheusLastBlockIdMetric := prometheus.NewGauge( prometheus.GaugeOpts{ @@ -36,11 +35,8 @@ func NewLastBlockMetric() *LastBlockMetric { } } -func (m *LastBlockMetric) OnPublish(sub *centrifuge.Subscription, e centrifuge.PublishEvent) { - var block blocks.Resource - err := json.Unmarshal(e.Data, &block) - helpers.CheckErr(err) - +// Update last block for prometheus metric +func (m *LastBlockMetric) OnNewBlock(block blocks.Resource) { blockTime, err := time.Parse(time.RFC3339, block.Timestamp) helpers.CheckErr(err)