From da3729bc39f3aa4d1a5a200084bc325df6e52777 Mon Sep 17 00:00:00 2001 From: blade Date: Sun, 24 Mar 2024 16:50:39 -0500 Subject: [PATCH] isolate checks into its own package --- .../checks/async_relay_handler.go | 2 +- .../checks/chain_config_handler.go | 8 +++---- .../checks/error_handler.go | 4 ++-- .../evm_data_integrity_check.go | 23 ++++++++++--------- .../evm_height_check.go | 19 +++++++-------- .../node_selector_service/checks/qos_check.go | 8 +++---- .../node_selector_service.go | 6 +++-- 7 files changed, 37 insertions(+), 33 deletions(-) rename internal/node_selector_service/checks/{ => evm_data_integrity_check}/evm_data_integrity_check.go (86%) rename internal/node_selector_service/checks/{ => evm_height_check}/evm_height_check.go (88%) diff --git a/internal/node_selector_service/checks/async_relay_handler.go b/internal/node_selector_service/checks/async_relay_handler.go index 8f385cc..3b72b6a 100644 --- a/internal/node_selector_service/checks/async_relay_handler.go +++ b/internal/node_selector_service/checks/async_relay_handler.go @@ -13,7 +13,7 @@ type nodeRelayResponse struct { Error error } -func sendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string) chan *nodeRelayResponse { +func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string) chan *nodeRelayResponse { // Define a channel to receive relay responses relayResponses := make(chan *nodeRelayResponse, len(nodes)) var wg sync.WaitGroup diff --git a/internal/node_selector_service/checks/chain_config_handler.go b/internal/node_selector_service/checks/chain_config_handler.go index eb35dce..faf2c54 100644 --- a/internal/node_selector_service/checks/chain_config_handler.go +++ b/internal/node_selector_service/checks/chain_config_handler.go @@ -2,8 +2,8 @@ package checks import "pokt_gateway_server/internal/chain_configurations_registry" -// getBlockHeightTolerance - helper function to retrieve block height tolerance across checks -func getBlockHeightTolerance(chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainId string, defaultValue int) int { +// GetBlockHeightTolerance - helper function to retrieve block height tolerance across checks +func GetBlockHeightTolerance(chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainId string, defaultValue int) int { chainConfig, ok := chainConfiguration.GetChainConfiguration(chainId) if !ok { return defaultValue @@ -11,8 +11,8 @@ func getBlockHeightTolerance(chainConfiguration chain_configurations_registry.Ch return int(*chainConfig.HeightCheckBlockTolerance) } -// getDataIntegrityHeightLookback - helper function ro retrieve data integrity lookback across checks -func getDataIntegrityHeightLookback(chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainId string, defaultValue int) int { +// GetDataIntegrityHeightLookback - helper function ro retrieve data integrity lookback across checks +func GetDataIntegrityHeightLookback(chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainId string, defaultValue int) int { chainConfig, ok := chainConfiguration.GetChainConfiguration(chainId) if !ok { return defaultValue diff --git a/internal/node_selector_service/checks/error_handler.go b/internal/node_selector_service/checks/error_handler.go index fcdd569..50208d9 100644 --- a/internal/node_selector_service/checks/error_handler.go +++ b/internal/node_selector_service/checks/error_handler.go @@ -25,8 +25,8 @@ func isTimeoutError(err error) bool { return err == fasthttp.ErrTimeout || err == fasthttp.ErrDialTimeout || err == fasthttp.ErrTLSHandshakeTimeout } -// defaultPunishNode: generic punisher for whenever a node returns an error independent of a specific check -func defaultPunishNode(err error, node *models.QosNode, logger *zap.Logger) bool { +// DefaultPunishNode: generic punisher for whenever a node returns an error independent of a specific check +func DefaultPunishNode(err error, node *models.QosNode, logger *zap.Logger) bool { if isMaximumRelaysServicedErr(err) { // 24 hours is analogous to indefinite node.SetTimeoutUntil(time.Now().Add(time.Hour*24), models.MaximumRelaysTimeout) diff --git a/internal/node_selector_service/checks/evm_data_integrity_check.go b/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go similarity index 86% rename from internal/node_selector_service/checks/evm_data_integrity_check.go rename to internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go index eed98e7..53b81a6 100644 --- a/internal/node_selector_service/checks/evm_data_integrity_check.go +++ b/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go @@ -1,10 +1,11 @@ -package checks +package evm_data_integrity_check import ( "encoding/json" "fmt" "github.com/valyala/fasthttp" "go.uber.org/zap" + "pokt_gateway_server/internal/node_selector_service/checks" "pokt_gateway_server/internal/node_selector_service/models" "pokt_gateway_server/pkg/common" "strconv" @@ -35,12 +36,12 @@ type blockByNumberResponse struct { } type EvmDataIntegrityCheck struct { - *Check + *checks.Check nextCheckTime time.Time logger *zap.Logger } -func NewEvmDataIntegrityCheck(check *Check, logger *zap.Logger) *EvmDataIntegrityCheck { +func NewEvmDataIntegrityCheck(check *checks.Check, logger *zap.Logger) *EvmDataIntegrityCheck { return &EvmDataIntegrityCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} } @@ -54,7 +55,7 @@ func (c *EvmDataIntegrityCheck) Name() string { } func (c *EvmDataIntegrityCheck) SetNodes(nodes []*models.QosNode) { - c.nodeList = nodes + c.NodeList = nodes } func (c *EvmDataIntegrityCheck) Perform() { @@ -64,7 +65,7 @@ func (c *EvmDataIntegrityCheck) Perform() { // Node that is synced cannot be found, so we cannot run data integrity checks since we need a trusted source if sourceOfTruth == nil { - c.logger.Sugar().Warnw("cannot find source of truth for data integrity check", "chain", c.nodeList[0].GetChain()) + c.logger.Sugar().Warnw("cannot find source of truth for data integrity check", "chain", c.NodeList[0].GetChain()) return } @@ -74,13 +75,13 @@ func (c *EvmDataIntegrityCheck) Perform() { var nodeResponsePairs []*nodeResponsePair // find a random block to search that nodes should have access too - blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(getDataIntegrityHeightLookback(c.chainConfiguration, sourceOfTruth.GetChain(), dataIntegrityHeightLookbackDefault)) + blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(checks.GetDataIntegrityHeightLookback(c.ChainConfiguration, sourceOfTruth.GetChain(), dataIntegrityHeightLookbackDefault)) - nodeResponses := sendRelaysAsync(c.pocketRelayer, c.getEligibleNodes(), getBlockByNumberPayload(blockNumberToSearch), "POST") + nodeResponses := checks.SendRelaysAsync(c.PocketRelayer, c.getEligibleNodes(), getBlockByNumberPayload(blockNumberToSearch), "POST") for rsp := range nodeResponses { if rsp.Error != nil { - defaultPunishNode(rsp.Error, rsp.Node, c.logger) + checks.DefaultPunishNode(rsp.Error, rsp.Node, c.logger) continue } @@ -88,7 +89,7 @@ func (c *EvmDataIntegrityCheck) Perform() { err := json.Unmarshal([]byte(rsp.Relay.Response), &resp) if err != nil { c.logger.Sugar().Warnw("failed to unmarshal response", "err", err) - defaultPunishNode(fasthttp.ErrTimeout, rsp.Node, c.logger) + checks.DefaultPunishNode(fasthttp.ErrTimeout, rsp.Node, c.logger) continue } @@ -133,7 +134,7 @@ func (c *EvmDataIntegrityCheck) ShouldRun() bool { // findRandomHealthyNode - returns a healthy node that is synced so we can use it as a source of truth for data integrity checks func (c *EvmDataIntegrityCheck) findRandomHealthyNode() *models.QosNode { var healthyNodes []*models.QosNode - for _, node := range c.nodeList { + for _, node := range c.NodeList { if node.IsHealthy() { healthyNodes = append(healthyNodes, node) } @@ -148,7 +149,7 @@ func (c *EvmDataIntegrityCheck) findRandomHealthyNode() *models.QosNode { func (c *EvmDataIntegrityCheck) getEligibleNodes() []*models.QosNode { // Filter nodes based on last checked time var eligibleNodes []*models.QosNode - for _, node := range c.nodeList { + for _, node := range c.NodeList { if (node.GetLastDataIntegrityCheckTime().IsZero() || time.Since(node.GetLastDataIntegrityCheckTime()) >= dataIntegrityNodeCheckInterval) && node.IsHealthy() { eligibleNodes = append(eligibleNodes, node) } diff --git a/internal/node_selector_service/checks/evm_height_check.go b/internal/node_selector_service/checks/evm_height_check/evm_height_check.go similarity index 88% rename from internal/node_selector_service/checks/evm_height_check.go rename to internal/node_selector_service/checks/evm_height_check/evm_height_check.go index 85ad3ca..1368239 100644 --- a/internal/node_selector_service/checks/evm_height_check.go +++ b/internal/node_selector_service/checks/evm_height_check/evm_height_check.go @@ -1,4 +1,4 @@ -package checks +package evm_height_check import ( "encoding/json" @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" "gonum.org/v1/gonum/stat" "math" + "pokt_gateway_server/internal/node_selector_service/checks" "pokt_gateway_server/internal/node_selector_service/models" "strconv" "strings" @@ -64,12 +65,12 @@ func (r *evmHeightResponse) UnmarshalJSON(data []byte) error { } type EvmHeightCheck struct { - *Check + *checks.Check nextCheckTime time.Time logger *zap.Logger } -func NewEvmHeightCheck(check *Check, logger *zap.Logger) *EvmHeightCheck { +func NewEvmHeightCheck(check *checks.Check, logger *zap.Logger) *EvmHeightCheck { return &EvmHeightCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} } @@ -80,7 +81,7 @@ func (c *EvmHeightCheck) Name() string { func (c *EvmHeightCheck) Perform() { // Send request to all nodes - relayResponses := sendRelaysAsync(c.pocketRelayer, c.nodeList, heightJsonPayload, "POST") + relayResponses := checks.SendRelaysAsync(c.PocketRelayer, c.NodeList, heightJsonPayload, "POST") var nodesResponded []*models.QosNode // Process relay responses @@ -88,7 +89,7 @@ func (c *EvmHeightCheck) Perform() { err := resp.Error if err != nil { - defaultPunishNode(err, resp.Node, c.logger) + checks.DefaultPunishNode(err, resp.Node, c.logger) continue } @@ -98,7 +99,7 @@ func (c *EvmHeightCheck) Perform() { if err != nil { c.logger.Sugar().Warnw("failed to unmarshal response", "err", err) // Treat a invalid response as a timeout error - defaultPunishNode(fasthttp.ErrTimeout, resp.Node, c.logger) + checks.DefaultPunishNode(fasthttp.ErrTimeout, resp.Node, c.logger) continue } @@ -112,7 +113,7 @@ func (c *EvmHeightCheck) Perform() { for _, node := range nodesResponded { heightDifference := int(highestNodeHeight - node.GetLastKnownHeight()) // Penalize nodes whose reported height is significantly lower than the highest reported height - if heightDifference > getBlockHeightTolerance(c.chainConfiguration, node.GetChain(), defaultHeightTolerance) { + if heightDifference > checks.GetBlockHeightTolerance(c.ChainConfiguration, node.GetChain(), defaultHeightTolerance) { c.logger.Sugar().Infow("node is out of sync", "node", node.MorseNode.ServiceUrl, "heightDifference", heightDifference, "nodeSyncedHeight", node.GetLastKnownHeight(), "highestNodeHeight", highestNodeHeight, "chain", node.GetChain()) // Punish Node specifically due to timeout. node.SetSynced(false) @@ -125,7 +126,7 @@ func (c *EvmHeightCheck) Perform() { } func (c *EvmHeightCheck) SetNodes(nodes []*models.QosNode) { - c.nodeList = nodes + c.NodeList = nodes } func (c *EvmHeightCheck) ShouldRun() bool { @@ -135,7 +136,7 @@ func (c *EvmHeightCheck) ShouldRun() bool { func (c *EvmHeightCheck) getEligibleNodes() []*models.QosNode { // Filter nodes based on last checked time var eligibleNodes []*models.QosNode - for _, node := range c.nodeList { + for _, node := range c.NodeList { if node.GetLastHeightCheckTime().IsZero() || time.Since(node.GetLastHeightCheckTime()) >= checkNodeHeightInterval { eligibleNodes = append(eligibleNodes, node) } diff --git a/internal/node_selector_service/checks/qos_check.go b/internal/node_selector_service/checks/qos_check.go index a0ed050..1556f84 100644 --- a/internal/node_selector_service/checks/qos_check.go +++ b/internal/node_selector_service/checks/qos_check.go @@ -14,11 +14,11 @@ type CheckJob interface { } type Check struct { - nodeList []*qos_models.QosNode - pocketRelayer pokt_v0.PocketRelayer - chainConfiguration chain_configurations_registry.ChainConfigurationsService + NodeList []*qos_models.QosNode + PocketRelayer pokt_v0.PocketRelayer + ChainConfiguration chain_configurations_registry.ChainConfigurationsService } func NewCheck(pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService) *Check { - return &Check{pocketRelayer: pocketRelayer} + return &Check{PocketRelayer: pocketRelayer} } diff --git a/internal/node_selector_service/node_selector_service.go b/internal/node_selector_service/node_selector_service.go index 9b0ee39..8944a04 100644 --- a/internal/node_selector_service/node_selector_service.go +++ b/internal/node_selector_service/node_selector_service.go @@ -4,6 +4,8 @@ import ( "go.uber.org/zap" "pokt_gateway_server/internal/chain_configurations_registry" "pokt_gateway_server/internal/node_selector_service/checks" + "pokt_gateway_server/internal/node_selector_service/checks/evm_data_integrity_check" + "pokt_gateway_server/internal/node_selector_service/checks/evm_height_check" "pokt_gateway_server/internal/node_selector_service/models" "pokt_gateway_server/internal/session_registry" "pokt_gateway_server/pkg/common" @@ -33,8 +35,8 @@ func NewNodeSelectorService(sessionRegistry session_registry.SessionRegistryServ // enabled checks enabledChecks := []checks.CheckJob{ - checks.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")), - checks.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")), + evm_height_check.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")), + evm_data_integrity_check.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")), } selectorService := &NodeSelectorClient{ sessionRegistry: sessionRegistry,