Skip to content

Commit

Permalink
refactor ws & fix kicked stakes
Browse files Browse the repository at this point in the history
  • Loading branch information
grkamil committed Oct 21, 2020
1 parent c27cb80 commit b98af01
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 51 deletions.
24 changes: 12 additions & 12 deletions .env.prod
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
EXPLORER_DEBUG=ME_DEBUG
APP_BASE_COIN=ME_BASE_COIN
CENTRIFUGO_LINK=ME_EXTENDER_WS_ADDRESS
CENTRIFUGO_BLOCK_CHANNEL=ME_EXTENDER_WS_CHANNEL_BLOCKS
DB_HOST=ME_DB_HOST
DB_PORT=ME_DB_PORT
DB_POOL_SIZE=ME_DB_POOL_SIZE
DB_NAME=ME_DB_NAME
DB_USER=ME_DB_USER
DB_PASSWORD=ME_DB_PASSWORD
EXPLORER_PORT=ME_SRV_PORT
MARKET_HOST=ME_MARKET_HOST
EXPLORER_DEBUG=
APP_BASE_COIN=
CENTRIFUGO_LINK=
CENTRIFUGO_BLOCK_CHANNEL=
DB_HOST=
DB_PORT=
DB_POOL_SIZE=
DB_NAME=
DB_USER=
DB_PASSWORD=
EXPLORER_PORT=
MARKET_HOST=
6 changes: 3 additions & 3 deletions api/v2/addresses/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func GetAddresses(c *gin.Context) {
addresses := explorer.AddressRepository.GetByAddresses(minterAddresses)

for k, addr := range addresses {
addresses[k] = extendModelWithBaseSymbolBalance(addr, addr.Address, explorer.Environment.BaseCoin)
addresses[k] = extendModelWithBaseSymbolBalance(addr, addr.Address, explorer.Environment.Basecoin)
}

// extend the model array with empty model if not exists
Expand All @@ -88,7 +88,7 @@ func GetAddresses(c *gin.Context) {
continue
}

addresses = append(addresses, makeEmptyAddressModel(item, explorer.Environment.BaseCoin))
addresses = append(addresses, makeEmptyAddressModel(item, explorer.Environment.Basecoin))
}
}

Expand Down Expand Up @@ -117,7 +117,7 @@ func GetAddress(c *gin.Context) {

// fetch address
model := explorer.AddressRepository.GetByAddress(*minterAddress)
model = extendModelWithBaseSymbolBalance(model, *minterAddress, explorer.Environment.BaseCoin)
model = extendModelWithBaseSymbolBalance(model, *minterAddress, explorer.Environment.Basecoin)

// calculate overall address balance in base coin and fiat
if request.WithSum {
Expand Down
4 changes: 4 additions & 0 deletions balance/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (s *Service) GetStakeBalance(stakes []models.Stake) *big.Float {
sum := big.NewInt(0)

for _, stake := range stakes {
if stake.IsKicked {
continue
}

// just add base coin to sum
if stake.Coin.Symbol == s.baseCoin {
sum = sum.Add(sum, helpers.StringToBigInt(stake.Value))
Expand Down
18 changes: 8 additions & 10 deletions cmd/explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"github.com/MinterTeam/minter-explorer-api/v2/api"
"github.com/MinterTeam/minter-explorer-api/v2/core"
"github.com/MinterTeam/minter-explorer-api/v2/core/ws"
"github.com/MinterTeam/minter-explorer-api/v2/database"
"github.com/MinterTeam/minter-explorer-api/v2/tools/metrics"
"github.com/joho/godotenv"
Expand All @@ -29,22 +30,19 @@ func main() {
go explorer.MarketService.Run()

// create ws extender
extender := core.NewExtenderWsClient(explorer)
extender := ws.NewExtenderWsClient(explorer)
defer extender.Close()

// create ws channel handler
blocksChannelHandler := ws.NewBlocksChannelHandler()
blocksChannelHandler.AddSubscriber(explorer.Cache)
blocksChannelHandler.AddSubscriber(metrics.NewLastBlockMetric())

// subscribe to channel and add cache handler
sub := extender.CreateSubscription(explorer.Environment.WsBlocksChannel)
sub.OnPublish(explorer.Cache)
sub.OnPublish(blocksChannelHandler)
extender.Subscribe(sub)

// TODO: refactor
// create ws extender for metrics
extender2 := core.NewExtenderWsClient(explorer)
defer extender2.Close()
metricSub := extender2.CreateSubscription(explorer.Environment.WsBlocksChannel)
metricSub.OnPublish(metrics.NewLastBlockMetric())
extender2.Subscribe(metricSub)

// run api
api.Run(db, explorer)
}
4 changes: 2 additions & 2 deletions core/enviroment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Environment struct {
DbPoolSize int
DbHost string
DbPort string
BaseCoin string
Basecoin string
ServerPort string
IsDebug bool
WsServer string
Expand All @@ -34,7 +34,7 @@ func NewEnvironment() *Environment {
DbPort: os.Getenv("DB_PORT"),
DbPoolSize: int(dbPoolSize),
DbHost: os.Getenv("DB_HOST"),
BaseCoin: os.Getenv("APP_BASE_COIN"),
Basecoin: os.Getenv("APP_BASE_COIN"),
ServerPort: os.Getenv("EXPLORER_PORT"),
IsDebug: os.Getenv("EXPLORER_DEBUG") == "1",
WsServer: os.Getenv("CENTRIFUGO_LINK"),
Expand Down
10 changes: 5 additions & 5 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ type Explorer struct {
}

func NewExplorer(db *pg.DB, env *Environment) *Explorer {
marketService := market.NewService(coingecko.NewService(env.MarketHost), env.BaseCoin)
marketService := market.NewService(coingecko.NewService(env.MarketHost), env.Basecoin)
blockRepository := *blocks.NewRepository(db)
validatorRepository := validator.NewRepository(db)
stakeRepository := stake.NewRepository(db)
cacheService := cache.NewCache(blockRepository.GetLastBlock())
coinRepository := *coins.NewRepository(db)
transactionService := transaction.NewService(&coinRepository)
coinRepository := coins.NewRepository(db)
transactionService := transaction.NewService(coinRepository)

return &Explorer{
BlockRepository: blockRepository,
CoinRepository: coinRepository,
CoinRepository: *coinRepository,
AddressRepository: *address.NewRepository(db),
TransactionRepository: *transaction.NewRepository(db),
InvalidTransactionRepository: *invalid_transaction.NewRepository(db),
Expand All @@ -62,7 +62,7 @@ func NewExplorer(db *pg.DB, env *Environment) *Explorer {
Cache: cacheService,
MarketService: marketService,
TransactionService: transactionService,
BalanceService: balance.NewService(env.BaseCoin, marketService),
BalanceService: balance.NewService(env.Basecoin, marketService),
ValidatorService: services.NewValidatorService(validatorRepository, stakeRepository, cacheService),
UnbondRepository: unbond.NewRepository(db),
StakeService: stake.NewService(stakeRepository),
Expand Down
39 changes: 39 additions & 0 deletions core/ws/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ws

import (
"encoding/json"
"github.com/MinterTeam/minter-explorer-api/v2/blocks"
"github.com/MinterTeam/minter-explorer-api/v2/helpers"
"github.com/centrifugal/centrifuge-go"
)

type BlocksChannelHandler struct {
Subscribers []NewBlockSubscriber
}

type NewBlockSubscriber interface {
OnNewBlock(blocks.Resource)
}

// Constructor for blocks event channel handler
func NewBlocksChannelHandler() *BlocksChannelHandler {
return &BlocksChannelHandler{
Subscribers: make([]NewBlockSubscriber, 0),
}
}

// Add new subscriber for channel
func (b *BlocksChannelHandler) AddSubscriber(sub NewBlockSubscriber) {
b.Subscribers = append(b.Subscribers, sub)
}

// Handle new block from ws and publish him to all subscribers
func (b *BlocksChannelHandler) OnPublish(sub *centrifuge.Subscription, e centrifuge.PublishEvent) {
var block blocks.Resource
err := json.Unmarshal(e.Data, &block)
helpers.CheckErr(err)

for _, sub := range b.Subscribers {
go sub.OnNewBlock(block)
}
}
5 changes: 3 additions & 2 deletions core/extender.go → core/ws/ws.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core
package ws

import (
"github.com/MinterTeam/minter-explorer-api/v2/core"
"github.com/MinterTeam/minter-explorer-api/v2/helpers"
"github.com/centrifugal/centrifuge-go"
)
Expand All @@ -10,7 +11,7 @@ type ExtenderWsClient struct {
}

// create new extender connection
func NewExtenderWsClient(explorer *Explorer) *ExtenderWsClient {
func NewExtenderWsClient(explorer *core.Explorer) *ExtenderWsClient {
c := centrifuge.New(explorer.Environment.WsServer, centrifuge.DefaultConfig())

err := c.Connect()
Expand Down
8 changes: 7 additions & 1 deletion stake/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func (repository Repository) GetAllByAddress(address string) ([]models.Stake, er
// Get total delegated bip value
func (repository Repository) GetSumInBipValue() (string, error) {
var sum string
err := repository.db.Model(&models.Stake{}).ColumnExpr("SUM(bip_value)").Select(&sum)

err := repository.db.Model(&models.Stake{}).
Where("is_kicked = false").
ColumnExpr("SUM(bip_value)").
Select(&sum)

return sum, err
}

Expand All @@ -42,6 +47,7 @@ func (repository Repository) GetSumInBipValueByAddress(address string) (string,
err := repository.db.Model(&models.Stake{}).
Column("OwnerAddress._").
ColumnExpr("SUM(bip_value)").
Where("is_kicked = false").
Where("owner_address.address = ?", address).
Select(&sum)

Expand Down
10 changes: 1 addition & 9 deletions tools/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package cache

import (
"encoding/json"
"github.com/MinterTeam/minter-explorer-api/v2/blocks"
"github.com/MinterTeam/minter-explorer-api/v2/helpers"
"github.com/MinterTeam/minter-explorer-extender/v2/models"
"github.com/centrifugal/centrifuge-go"
"sync"
"time"
)
Expand Down Expand Up @@ -83,11 +80,6 @@ func (c *ExplorerCache) GetLastBlock() blocks.Resource {
}

// update last block id by ws data
func (c *ExplorerCache) OnPublish(sub *centrifuge.Subscription, e centrifuge.PublishEvent) {
var block blocks.Resource
err := json.Unmarshal(e.Data, &block)
helpers.CheckErr(err)

// update last block id
func (c *ExplorerCache) OnNewBlock(block blocks.Resource) {
c.SetLastBlock(block)
}
10 changes: 3 additions & 7 deletions tools/metrics/block.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package metrics

import (
"encoding/json"
"github.com/MinterTeam/minter-explorer-api/v2/blocks"
"github.com/MinterTeam/minter-explorer-api/v2/helpers"
"github.com/centrifugal/centrifuge-go"
"github.com/prometheus/client_golang/prometheus"
"time"
)
Expand All @@ -14,6 +12,7 @@ type LastBlockMetric struct {
time prometheus.Gauge
}

// Constructor for prometheus metrics
func NewLastBlockMetric() *LastBlockMetric {
prometheusLastBlockIdMetric := prometheus.NewGauge(
prometheus.GaugeOpts{
Expand All @@ -36,11 +35,8 @@ func NewLastBlockMetric() *LastBlockMetric {
}
}

func (m *LastBlockMetric) OnPublish(sub *centrifuge.Subscription, e centrifuge.PublishEvent) {
var block blocks.Resource
err := json.Unmarshal(e.Data, &block)
helpers.CheckErr(err)

// Update last block for prometheus metric
func (m *LastBlockMetric) OnNewBlock(block blocks.Resource) {
blockTime, err := time.Parse(time.RFC3339, block.Timestamp)
helpers.CheckErr(err)

Expand Down

0 comments on commit b98af01

Please sign in to comment.