Skip to content
This repository was archived by the owner on Nov 13, 2024. It is now read-only.

Commit

Permalink
isolate checks into its own package
Browse files Browse the repository at this point in the history
  • Loading branch information
blade committed Mar 24, 2024
1 parent cff981c commit da3729b
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type nodeRelayResponse struct {
Error error
}

func sendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string) chan *nodeRelayResponse {
func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string) chan *nodeRelayResponse {
// Define a channel to receive relay responses
relayResponses := make(chan *nodeRelayResponse, len(nodes))
var wg sync.WaitGroup
Expand Down
8 changes: 4 additions & 4 deletions internal/node_selector_service/checks/chain_config_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package checks

import "pokt_gateway_server/internal/chain_configurations_registry"

// getBlockHeightTolerance - helper function to retrieve block height tolerance across checks
func getBlockHeightTolerance(chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainId string, defaultValue int) int {
// GetBlockHeightTolerance - helper function to retrieve block height tolerance across checks
func GetBlockHeightTolerance(chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainId string, defaultValue int) int {
chainConfig, ok := chainConfiguration.GetChainConfiguration(chainId)
if !ok {
return defaultValue
}
return int(*chainConfig.HeightCheckBlockTolerance)
}

// getDataIntegrityHeightLookback - helper function ro retrieve data integrity lookback across checks
func getDataIntegrityHeightLookback(chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainId string, defaultValue int) int {
// GetDataIntegrityHeightLookback - helper function ro retrieve data integrity lookback across checks
func GetDataIntegrityHeightLookback(chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainId string, defaultValue int) int {
chainConfig, ok := chainConfiguration.GetChainConfiguration(chainId)
if !ok {
return defaultValue
Expand Down
4 changes: 2 additions & 2 deletions internal/node_selector_service/checks/error_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func isTimeoutError(err error) bool {
return err == fasthttp.ErrTimeout || err == fasthttp.ErrDialTimeout || err == fasthttp.ErrTLSHandshakeTimeout
}

// defaultPunishNode: generic punisher for whenever a node returns an error independent of a specific check
func defaultPunishNode(err error, node *models.QosNode, logger *zap.Logger) bool {
// DefaultPunishNode: generic punisher for whenever a node returns an error independent of a specific check
func DefaultPunishNode(err error, node *models.QosNode, logger *zap.Logger) bool {
if isMaximumRelaysServicedErr(err) {
// 24 hours is analogous to indefinite
node.SetTimeoutUntil(time.Now().Add(time.Hour*24), models.MaximumRelaysTimeout)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package checks
package evm_data_integrity_check

import (
"encoding/json"
"fmt"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
"pokt_gateway_server/internal/node_selector_service/checks"
"pokt_gateway_server/internal/node_selector_service/models"
"pokt_gateway_server/pkg/common"
"strconv"
Expand Down Expand Up @@ -35,12 +36,12 @@ type blockByNumberResponse struct {
}

type EvmDataIntegrityCheck struct {
*Check
*checks.Check
nextCheckTime time.Time
logger *zap.Logger
}

func NewEvmDataIntegrityCheck(check *Check, logger *zap.Logger) *EvmDataIntegrityCheck {
func NewEvmDataIntegrityCheck(check *checks.Check, logger *zap.Logger) *EvmDataIntegrityCheck {
return &EvmDataIntegrityCheck{Check: check, nextCheckTime: time.Time{}, logger: logger}
}

Expand All @@ -54,7 +55,7 @@ func (c *EvmDataIntegrityCheck) Name() string {
}

func (c *EvmDataIntegrityCheck) SetNodes(nodes []*models.QosNode) {
c.nodeList = nodes
c.NodeList = nodes
}

func (c *EvmDataIntegrityCheck) Perform() {
Expand All @@ -64,7 +65,7 @@ func (c *EvmDataIntegrityCheck) Perform() {

// Node that is synced cannot be found, so we cannot run data integrity checks since we need a trusted source
if sourceOfTruth == nil {
c.logger.Sugar().Warnw("cannot find source of truth for data integrity check", "chain", c.nodeList[0].GetChain())
c.logger.Sugar().Warnw("cannot find source of truth for data integrity check", "chain", c.NodeList[0].GetChain())
return
}

Expand All @@ -74,21 +75,21 @@ func (c *EvmDataIntegrityCheck) Perform() {
var nodeResponsePairs []*nodeResponsePair

// find a random block to search that nodes should have access too
blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(getDataIntegrityHeightLookback(c.chainConfiguration, sourceOfTruth.GetChain(), dataIntegrityHeightLookbackDefault))
blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(checks.GetDataIntegrityHeightLookback(c.ChainConfiguration, sourceOfTruth.GetChain(), dataIntegrityHeightLookbackDefault))

nodeResponses := sendRelaysAsync(c.pocketRelayer, c.getEligibleNodes(), getBlockByNumberPayload(blockNumberToSearch), "POST")
nodeResponses := checks.SendRelaysAsync(c.PocketRelayer, c.getEligibleNodes(), getBlockByNumberPayload(blockNumberToSearch), "POST")
for rsp := range nodeResponses {

if rsp.Error != nil {
defaultPunishNode(rsp.Error, rsp.Node, c.logger)
checks.DefaultPunishNode(rsp.Error, rsp.Node, c.logger)
continue
}

var resp blockByNumberResponse
err := json.Unmarshal([]byte(rsp.Relay.Response), &resp)
if err != nil {
c.logger.Sugar().Warnw("failed to unmarshal response", "err", err)
defaultPunishNode(fasthttp.ErrTimeout, rsp.Node, c.logger)
checks.DefaultPunishNode(fasthttp.ErrTimeout, rsp.Node, c.logger)
continue
}

Expand Down Expand Up @@ -133,7 +134,7 @@ func (c *EvmDataIntegrityCheck) ShouldRun() bool {
// findRandomHealthyNode - returns a healthy node that is synced so we can use it as a source of truth for data integrity checks
func (c *EvmDataIntegrityCheck) findRandomHealthyNode() *models.QosNode {
var healthyNodes []*models.QosNode
for _, node := range c.nodeList {
for _, node := range c.NodeList {
if node.IsHealthy() {
healthyNodes = append(healthyNodes, node)
}
Expand All @@ -148,7 +149,7 @@ func (c *EvmDataIntegrityCheck) findRandomHealthyNode() *models.QosNode {
func (c *EvmDataIntegrityCheck) getEligibleNodes() []*models.QosNode {
// Filter nodes based on last checked time
var eligibleNodes []*models.QosNode
for _, node := range c.nodeList {
for _, node := range c.NodeList {
if (node.GetLastDataIntegrityCheckTime().IsZero() || time.Since(node.GetLastDataIntegrityCheckTime()) >= dataIntegrityNodeCheckInterval) && node.IsHealthy() {
eligibleNodes = append(eligibleNodes, node)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package checks
package evm_height_check

import (
"encoding/json"
Expand All @@ -8,6 +8,7 @@ import (
"go.uber.org/zap"
"gonum.org/v1/gonum/stat"
"math"
"pokt_gateway_server/internal/node_selector_service/checks"
"pokt_gateway_server/internal/node_selector_service/models"
"strconv"
"strings"
Expand Down Expand Up @@ -64,12 +65,12 @@ func (r *evmHeightResponse) UnmarshalJSON(data []byte) error {
}

type EvmHeightCheck struct {
*Check
*checks.Check
nextCheckTime time.Time
logger *zap.Logger
}

func NewEvmHeightCheck(check *Check, logger *zap.Logger) *EvmHeightCheck {
func NewEvmHeightCheck(check *checks.Check, logger *zap.Logger) *EvmHeightCheck {
return &EvmHeightCheck{Check: check, nextCheckTime: time.Time{}, logger: logger}
}

Expand All @@ -80,15 +81,15 @@ func (c *EvmHeightCheck) Name() string {
func (c *EvmHeightCheck) Perform() {

// Send request to all nodes
relayResponses := sendRelaysAsync(c.pocketRelayer, c.nodeList, heightJsonPayload, "POST")
relayResponses := checks.SendRelaysAsync(c.PocketRelayer, c.NodeList, heightJsonPayload, "POST")

var nodesResponded []*models.QosNode
// Process relay responses
for resp := range relayResponses {

err := resp.Error
if err != nil {
defaultPunishNode(err, resp.Node, c.logger)
checks.DefaultPunishNode(err, resp.Node, c.logger)
continue
}

Expand All @@ -98,7 +99,7 @@ func (c *EvmHeightCheck) Perform() {
if err != nil {
c.logger.Sugar().Warnw("failed to unmarshal response", "err", err)
// Treat a invalid response as a timeout error
defaultPunishNode(fasthttp.ErrTimeout, resp.Node, c.logger)
checks.DefaultPunishNode(fasthttp.ErrTimeout, resp.Node, c.logger)
continue
}

Expand All @@ -112,7 +113,7 @@ func (c *EvmHeightCheck) Perform() {
for _, node := range nodesResponded {
heightDifference := int(highestNodeHeight - node.GetLastKnownHeight())
// Penalize nodes whose reported height is significantly lower than the highest reported height
if heightDifference > getBlockHeightTolerance(c.chainConfiguration, node.GetChain(), defaultHeightTolerance) {
if heightDifference > checks.GetBlockHeightTolerance(c.ChainConfiguration, node.GetChain(), defaultHeightTolerance) {
c.logger.Sugar().Infow("node is out of sync", "node", node.MorseNode.ServiceUrl, "heightDifference", heightDifference, "nodeSyncedHeight", node.GetLastKnownHeight(), "highestNodeHeight", highestNodeHeight, "chain", node.GetChain())
// Punish Node specifically due to timeout.
node.SetSynced(false)
Expand All @@ -125,7 +126,7 @@ func (c *EvmHeightCheck) Perform() {
}

func (c *EvmHeightCheck) SetNodes(nodes []*models.QosNode) {
c.nodeList = nodes
c.NodeList = nodes
}

func (c *EvmHeightCheck) ShouldRun() bool {
Expand All @@ -135,7 +136,7 @@ func (c *EvmHeightCheck) ShouldRun() bool {
func (c *EvmHeightCheck) getEligibleNodes() []*models.QosNode {
// Filter nodes based on last checked time
var eligibleNodes []*models.QosNode
for _, node := range c.nodeList {
for _, node := range c.NodeList {
if node.GetLastHeightCheckTime().IsZero() || time.Since(node.GetLastHeightCheckTime()) >= checkNodeHeightInterval {
eligibleNodes = append(eligibleNodes, node)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/node_selector_service/checks/qos_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ type CheckJob interface {
}

type Check struct {
nodeList []*qos_models.QosNode
pocketRelayer pokt_v0.PocketRelayer
chainConfiguration chain_configurations_registry.ChainConfigurationsService
NodeList []*qos_models.QosNode
PocketRelayer pokt_v0.PocketRelayer
ChainConfiguration chain_configurations_registry.ChainConfigurationsService
}

func NewCheck(pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService) *Check {
return &Check{pocketRelayer: pocketRelayer}
return &Check{PocketRelayer: pocketRelayer}
}
6 changes: 4 additions & 2 deletions internal/node_selector_service/node_selector_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"go.uber.org/zap"
"pokt_gateway_server/internal/chain_configurations_registry"
"pokt_gateway_server/internal/node_selector_service/checks"
"pokt_gateway_server/internal/node_selector_service/checks/evm_data_integrity_check"
"pokt_gateway_server/internal/node_selector_service/checks/evm_height_check"
"pokt_gateway_server/internal/node_selector_service/models"
"pokt_gateway_server/internal/session_registry"
"pokt_gateway_server/pkg/common"
Expand Down Expand Up @@ -33,8 +35,8 @@ func NewNodeSelectorService(sessionRegistry session_registry.SessionRegistryServ

// enabled checks
enabledChecks := []checks.CheckJob{
checks.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")),
checks.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")),
evm_height_check.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")),
evm_data_integrity_check.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")),
}
selectorService := &NodeSelectorClient{
sessionRegistry: sessionRegistry,
Expand Down

0 comments on commit da3729b

Please sign in to comment.