From 6ad738f78dd5dfacbdf986237cdd0fd9a54bb0a4 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 20 Sep 2024 08:40:01 -0400 Subject: [PATCH] MultiNode Integration: Initial Setup (#824) * MultiNode integration setup * Update MultiNode files * Add MultiNode flag * Remove internal dependency * Fix build * Fix import cycle * tidy * Update client_test.go * lint * Fix duplicate metrics * Add chain multinode flag * Extend client * Address comments * lint * Fix lint overflow issues * Update transaction_sender.go * Fix lint * Validate node config * Update toml.go * Add SendOnly nodes * Use test context * lint --- go.mod | 2 +- pkg/solana/chain.go | 92 ++- pkg/solana/chain_test.go | 3 +- pkg/solana/client/client.go | 82 ++- pkg/solana/client/client_test.go | 9 +- pkg/solana/client/mocks/ReaderWriter.go | 23 +- pkg/solana/client/multinode/ctx.go | 17 + pkg/solana/client/multinode/models.go | 121 +++ pkg/solana/client/multinode/multi_node.go | 379 ++++++++++ pkg/solana/client/multinode/node.go | 328 +++++++++ pkg/solana/client/multinode/node_fsm.go | 370 ++++++++++ pkg/solana/client/multinode/node_lifecycle.go | 687 ++++++++++++++++++ pkg/solana/client/multinode/node_selector.go | 41 ++ .../multinode/node_selector_highest_head.go | 36 + .../multinode/node_selector_priority_level.go | 121 +++ .../multinode/node_selector_round_robin.go | 46 ++ .../node_selector_total_difficulty.go | 51 ++ pkg/solana/client/multinode/poller.go | 93 +++ pkg/solana/client/multinode/redialbackoff.go | 17 + pkg/solana/client/multinode/send_only_node.go | 181 +++++ .../multinode/send_only_node_lifecycle.go | 65 ++ .../client/multinode/transaction_sender.go | 276 +++++++ pkg/solana/client/multinode/types.go | 108 +++ pkg/solana/cmd/chainlink-solana/main.go | 2 + pkg/solana/config/config.go | 5 +- pkg/solana/config/multinode.go | 87 +++ pkg/solana/config/toml.go | 9 +- 27 files changed, 3222 insertions(+), 29 deletions(-) create mode 100644 pkg/solana/client/multinode/ctx.go create mode 100644 pkg/solana/client/multinode/models.go create mode 100644 pkg/solana/client/multinode/multi_node.go create mode 100644 pkg/solana/client/multinode/node.go create mode 100644 pkg/solana/client/multinode/node_fsm.go create mode 100644 pkg/solana/client/multinode/node_lifecycle.go create mode 100644 pkg/solana/client/multinode/node_selector.go create mode 100644 pkg/solana/client/multinode/node_selector_highest_head.go create mode 100644 pkg/solana/client/multinode/node_selector_priority_level.go create mode 100644 pkg/solana/client/multinode/node_selector_round_robin.go create mode 100644 pkg/solana/client/multinode/node_selector_total_difficulty.go create mode 100644 pkg/solana/client/multinode/poller.go create mode 100644 pkg/solana/client/multinode/redialbackoff.go create mode 100644 pkg/solana/client/multinode/send_only_node.go create mode 100644 pkg/solana/client/multinode/send_only_node_lifecycle.go create mode 100644 pkg/solana/client/multinode/transaction_sender.go create mode 100644 pkg/solana/client/multinode/types.go create mode 100644 pkg/solana/config/multinode.go diff --git a/go.mod b/go.mod index 2f608af84..cb7a616c0 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.1.0 github.com/google/uuid v1.6.0 github.com/hashicorp/go-plugin v1.6.2-0.20240829161738-06afb6d7ae99 + github.com/jpillora/backoff v1.0.0 github.com/pelletier/go-toml/v2 v2.2.0 github.com/prometheus/client_golang v1.17.0 github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913191949-44d96950c886 @@ -58,7 +59,6 @@ require ( github.com/hashicorp/go-hclog v1.5.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/invopop/jsonschema v0.12.0 // indirect - github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.15.15 // indirect github.com/kr/pretty v0.3.1 // indirect diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index 86545b0de..8b4ad5787 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "math/big" "math/rand" "strconv" @@ -22,6 +23,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/core" + mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" @@ -85,6 +88,10 @@ type chain struct { balanceMonitor services.Service lggr logger.Logger + // if multiNode is enabled, the clientCache will not be used + multiNode *mn.MultiNode[mn.StringID, *client.Client] + txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.Client] + // tracking node chain id for verification clientCache map[string]*verifiedCachedClient // map URL -> {client, chainId} [mainnet/testnet/devnet/localnet] clientLock sync.RWMutex @@ -114,7 +121,8 @@ func (v *verifiedCachedClient) verifyChainID() (bool, error) { v.chainIDVerifiedLock.Lock() defer v.chainIDVerifiedLock.Unlock() - v.chainID, err = v.ReaderWriter.ChainID() + strID, err := v.ReaderWriter.ChainID(context.Background()) + v.chainID = strID.String() if err != nil { v.chainIDVerified = false return v.chainIDVerified, fmt.Errorf("failed to fetch ChainID in verifiedCachedClient: %w", err) @@ -186,13 +194,13 @@ func (v *verifiedCachedClient) LatestBlockhash() (*rpc.GetLatestBlockhashResult, return v.ReaderWriter.LatestBlockhash() } -func (v *verifiedCachedClient) ChainID() (string, error) { +func (v *verifiedCachedClient) ChainID(ctx context.Context) (mn.StringID, error) { verified, err := v.verifyChainID() if !verified { return "", err } - return v.chainID, nil + return mn.StringID(v.chainID), nil } func (v *verifiedCachedClient) GetFeeForMessage(msg string) (uint64, error) { @@ -221,6 +229,66 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L lggr: logger.Named(lggr, "Chain"), clientCache: map[string]*verifiedCachedClient{}, } + + if cfg.MultiNodeEnabled() { + chainFamily := "solana" + + mnCfg := cfg.MultiNodeConfig() + + var nodes []mn.Node[mn.StringID, *client.Client] + var sendOnlyNodes []mn.SendOnlyNode[mn.StringID, *client.Client] + + for i, nodeInfo := range cfg.ListNodes() { + rpcClient, err := client.NewClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name)) + if err != nil { + lggr.Warnw("failed to create client", "name", *nodeInfo.Name, "solana-url", nodeInfo.URL.String(), "err", err.Error()) + return nil, fmt.Errorf("failed to create client: %w", err) + } + + newNode := mn.NewNode[mn.StringID, *client.Head, *client.Client]( + mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name, + i, mn.StringID(id), 0, rpcClient, chainFamily) + + if nodeInfo.SendOnly { + sendOnlyNodes = append(sendOnlyNodes, newNode) + } else { + nodes = append(nodes, newNode) + } + } + + multiNode := mn.NewMultiNode[mn.StringID, *client.Client]( + lggr, + mn.NodeSelectionModeRoundRobin, + 0, + nodes, + sendOnlyNodes, + mn.StringID(id), + chainFamily, + mnCfg.DeathDeclarationDelay(), + ) + + // TODO: implement error classification; move logic to separate file if large + // TODO: might be useful to reference anza-xyz/agave@master/sdk/src/transaction/error.rs + classifySendError := func(tx *solanago.Transaction, err error) mn.SendTxReturnCode { + return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false) + } + + txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.Client]( + lggr, + mn.StringID(id), + chainFamily, + multiNode, + classifySendError, + 0, // use the default value provided by the implementation + ) + + ch.multiNode = multiNode + ch.txSender = txSender + + // clientCache will not be used if multinode is enabled + ch.clientCache = nil + } + tc := func() (client.ReaderWriter, error) { return ch.getClient() } @@ -330,6 +398,10 @@ func (c *chain) ChainID() string { // getClient returns a client, randomly selecting one from available and valid nodes func (c *chain) getClient() (client.ReaderWriter, error) { + if c.cfg.MultiNodeEnabled() { + return c.multiNode.SelectRPC() + } + var node *config.Node var client client.ReaderWriter nodes := c.cfg.ListNodes() @@ -409,7 +481,12 @@ func (c *chain) Start(ctx context.Context) error { c.lggr.Debug("Starting txm") c.lggr.Debug("Starting balance monitor") var ms services.MultiStart - return ms.Start(ctx, c.txm, c.balanceMonitor) + startAll := []services.StartClose{c.txm, c.balanceMonitor} + if c.cfg.MultiNodeEnabled() { + c.lggr.Debug("Starting multinode") + startAll = append(startAll, c.multiNode, c.txSender) + } + return ms.Start(ctx, startAll...) }) } @@ -418,7 +495,12 @@ func (c *chain) Close() error { c.lggr.Debug("Stopping") c.lggr.Debug("Stopping txm") c.lggr.Debug("Stopping balance monitor") - return services.CloseAll(c.txm, c.balanceMonitor) + closeAll := []io.Closer{c.txm, c.balanceMonitor} + if c.cfg.MultiNodeEnabled() { + c.lggr.Debug("Stopping multinode") + closeAll = append(closeAll, c.multiNode, c.txSender) + } + return services.CloseAll(closeAll...) }) } diff --git a/pkg/solana/chain_test.go b/pkg/solana/chain_test.go index aa52b8b4d..6fb966740 100644 --- a/pkg/solana/chain_test.go +++ b/pkg/solana/chain_test.go @@ -16,6 +16,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" solcfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" @@ -174,7 +175,7 @@ func TestSolanaChain_VerifiedClient(t *testing.T) { testChain.id = "incorrect" c, err = testChain.verifiedClient(node) assert.NoError(t, err) - _, err = c.ChainID() + _, err = c.ChainID(tests.Context(t)) // expect error from id mismatch (even if using a cached client) when performing RPC calls assert.Error(t, err) assert.Equal(t, fmt.Sprintf("client returned mismatched chain id (expected: %s, got: %s): %s", "incorrect", "devnet", node.URL), err.Error()) diff --git a/pkg/solana/client/client.go b/pkg/solana/client/client.go index e51c93837..785e7e508 100644 --- a/pkg/solana/client/client.go +++ b/pkg/solana/client/client.go @@ -4,8 +4,11 @@ import ( "context" "errors" "fmt" + "math/big" "time" + mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" + "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" "golang.org/x/sync/singleflight" @@ -33,7 +36,7 @@ type Reader interface { Balance(addr solana.PublicKey) (uint64, error) SlotHeight() (uint64, error) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) - ChainID() (string, error) + ChainID(ctx context.Context) (mn.StringID, error) GetFeeForMessage(msg string) (uint64, error) GetLatestBlock() (*rpc.GetBlockResult, error) } @@ -65,6 +68,27 @@ type Client struct { requestGroup *singleflight.Group } +type Head struct { + rpc.GetBlockResult +} + +func (h *Head) BlockNumber() int64 { + if !h.IsValid() { + return 0 + } + // nolint:gosec + // G115: integer overflow conversion uint64 -> int64 + return int64(*h.BlockHeight) +} + +func (h *Head) BlockDifficulty() *big.Int { + return nil +} + +func (h *Head) IsValid() bool { + return h.BlockHeight != nil +} + func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) { return &Client{ url: endpoint, @@ -79,6 +103,56 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, }, nil } +var _ mn.RPCClient[mn.StringID, *Head] = (*Client)(nil) +var _ mn.SendTxRPCClient[*solana.Transaction] = (*Client)(nil) + +// TODO: BCI-4061: Implement Client for MultiNode + +func (c *Client) Dial(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (c *Client) SubscribeToHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) { + //TODO implement me + panic("implement me") +} + +func (c *Client) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) { + //TODO implement me + panic("implement me") +} + +func (c *Client) Ping(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (c *Client) IsSyncing(ctx context.Context) (bool, error) { + //TODO implement me + panic("implement me") +} + +func (c *Client) UnsubscribeAllExcept(subs ...mn.Subscription) { + //TODO implement me + panic("implement me") +} + +func (c *Client) Close() { + //TODO implement me + panic("implement me") +} + +func (c *Client) GetInterceptedChainInfo() (latest, highestUserObservations mn.ChainInfo) { + //TODO implement me + panic("implement me") +} + +func (c *Client) SendTransaction(ctx context.Context, tx *solana.Transaction) error { + // TODO: Implement + return nil +} + func (c *Client) latency(name string) func() { start := time.Now() return func() { @@ -142,11 +216,11 @@ func (c *Client) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) { return v.(*rpc.GetLatestBlockhashResult), err } -func (c *Client) ChainID() (string, error) { +func (c *Client) ChainID(ctx context.Context) (mn.StringID, error) { done := c.latency("chain_id") defer done() - ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) + ctx, cancel := context.WithTimeout(ctx, c.contextDuration) defer cancel() v, err, _ := c.requestGroup.Do("GetGenesisHash", func() (interface{}, error) { return c.rpc.GetGenesisHash(ctx) @@ -168,7 +242,7 @@ func (c *Client) ChainID() (string, error) { c.log.Warnf("unknown genesis hash - assuming solana chain is 'localnet'") network = "localnet" } - return network, nil + return mn.StringID(network), nil } func (c *Client) GetFeeForMessage(msg string) (uint64, error) { diff --git a/pkg/solana/client/client_test.go b/pkg/solana/client/client_test.go index ab9dba263..6a4feb61f 100644 --- a/pkg/solana/client/client_test.go +++ b/pkg/solana/client/client_test.go @@ -20,6 +20,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" + mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" ) @@ -76,9 +77,9 @@ func TestClient_Reader_Integration(t *testing.T) { assert.Equal(t, uint64(5000), fee) // get chain ID based on gensis hash - network, err := c.ChainID() + network, err := c.ChainID(context.Background()) assert.NoError(t, err) - assert.Equal(t, "localnet", network) + assert.Equal(t, mn.StringID("localnet"), network) // get account info (also tested inside contract_test) res, err := c.GetAccountInfoWithOpts(context.TODO(), solana.PublicKey{}, &rpc.GetAccountInfoOpts{Commitment: rpc.CommitmentFinalized}) @@ -120,9 +121,9 @@ func TestClient_Reader_ChainID(t *testing.T) { // get chain ID based on gensis hash for _, n := range networks { - network, err := c.ChainID() + network, err := c.ChainID(context.Background()) assert.NoError(t, err) - assert.Equal(t, n, network) + assert.Equal(t, mn.StringID(n), network) } } diff --git a/pkg/solana/client/mocks/ReaderWriter.go b/pkg/solana/client/mocks/ReaderWriter.go index 2bbb82fef..b6cd6808a 100644 --- a/pkg/solana/client/mocks/ReaderWriter.go +++ b/pkg/solana/client/mocks/ReaderWriter.go @@ -6,6 +6,7 @@ import ( context "context" rpc "github.com/gagliardetto/solana-go/rpc" + multinode "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" mock "github.com/stretchr/testify/mock" solana "github.com/gagliardetto/solana-go" @@ -44,27 +45,27 @@ func (_m *ReaderWriter) Balance(addr solana.PublicKey) (uint64, error) { return r0, r1 } -// ChainID provides a mock function with given fields: -func (_m *ReaderWriter) ChainID() (string, error) { - ret := _m.Called() +// ChainID provides a mock function with given fields: ctx +func (_m *ReaderWriter) ChainID(ctx context.Context) (multinode.StringID, error) { + ret := _m.Called(ctx) if len(ret) == 0 { panic("no return value specified for ChainID") } - var r0 string + var r0 multinode.StringID var r1 error - if rf, ok := ret.Get(0).(func() (string, error)); ok { - return rf() + if rf, ok := ret.Get(0).(func(context.Context) (multinode.StringID, error)); ok { + return rf(ctx) } - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(context.Context) multinode.StringID); ok { + r0 = rf(ctx) } else { - r0 = ret.Get(0).(string) + r0 = ret.Get(0).(multinode.StringID) } - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) } else { r1 = ret.Error(1) } diff --git a/pkg/solana/client/multinode/ctx.go b/pkg/solana/client/multinode/ctx.go new file mode 100644 index 000000000..57b2fc8a8 --- /dev/null +++ b/pkg/solana/client/multinode/ctx.go @@ -0,0 +1,17 @@ +package client + +import "context" + +type multiNodeContextKey int + +const ( + contextKeyHeathCheckRequest multiNodeContextKey = iota + 1 +) + +func CtxAddHealthCheckFlag(ctx context.Context) context.Context { + return context.WithValue(ctx, contextKeyHeathCheckRequest, struct{}{}) +} + +func CtxIsHeathCheckRequest(ctx context.Context) bool { + return ctx.Value(contextKeyHeathCheckRequest) != nil +} diff --git a/pkg/solana/client/multinode/models.go b/pkg/solana/client/multinode/models.go new file mode 100644 index 000000000..526bb25c8 --- /dev/null +++ b/pkg/solana/client/multinode/models.go @@ -0,0 +1,121 @@ +package client + +import ( + "bytes" + "fmt" +) + +type SendTxReturnCode int + +// SendTxReturnCode is a generalized client error that dictates what should be the next action, depending on the RPC error response. +const ( + Successful SendTxReturnCode = iota + 1 + Fatal // Unrecoverable error. Most likely the attempt should be thrown away. + Retryable // The error returned by the RPC indicates that if we retry with the same attempt, the tx will eventually go through. + Underpriced // Attempt was underpriced. New estimation is needed with bumped gas price. + Unknown // Tx failed with an error response that is not recognized by the client. + Unsupported // Attempt failed with an error response that is not supported by the client for the given chain. + TransactionAlreadyKnown // The transaction that was sent has already been received by the RPC. + InsufficientFunds // Tx was rejected due to insufficient funds. + ExceedsMaxFee // Attempt's fee was higher than the node's limit and got rejected. + FeeOutOfValidRange // This error is returned when we use a fee price suggested from an RPC, but the network rejects the attempt due to an invalid range(mostly used by L2 chains). Retry by requesting a new suggested fee price. + TerminallyStuck // The error returned when a transaction is or could get terminally stuck in the mempool without any chance of inclusion. + sendTxReturnCodeLen // tracks the number of errors. Must always be last +) + +// sendTxSevereErrors - error codes which signal that transaction would never be accepted in its current form by the node +var sendTxSevereErrors = []SendTxReturnCode{Fatal, Underpriced, Unsupported, ExceedsMaxFee, FeeOutOfValidRange, Unknown} + +// sendTxSuccessfulCodes - error codes which signal that transaction was accepted by the node +var sendTxSuccessfulCodes = []SendTxReturnCode{Successful, TransactionAlreadyKnown} + +func (c SendTxReturnCode) String() string { + switch c { + case Successful: + return "Successful" + case Fatal: + return "Fatal" + case Retryable: + return "Retryable" + case Underpriced: + return "Underpriced" + case Unknown: + return "Unknown" + case Unsupported: + return "Unsupported" + case TransactionAlreadyKnown: + return "TransactionAlreadyKnown" + case InsufficientFunds: + return "InsufficientFunds" + case ExceedsMaxFee: + return "ExceedsMaxFee" + case FeeOutOfValidRange: + return "FeeOutOfValidRange" + case TerminallyStuck: + return "TerminallyStuck" + default: + return fmt.Sprintf("SendTxReturnCode(%d)", c) + } +} + +type NodeTier int + +const ( + Primary = NodeTier(iota) + Secondary +) + +func (n NodeTier) String() string { + switch n { + case Primary: + return "primary" + case Secondary: + return "secondary" + default: + return fmt.Sprintf("NodeTier(%d)", n) + } +} + +// syncStatus - defines problems related to RPC's state synchronization. Can be used as a bitmask to define multiple issues +type syncStatus int + +const ( + // syncStatusSynced - RPC is fully synced + syncStatusSynced = 0 + // syncStatusNotInSyncWithPool - RPC is lagging behind the highest block observed within the pool of RPCs + syncStatusNotInSyncWithPool syncStatus = 1 << iota + // syncStatusNoNewHead - RPC failed to produce a new head for too long + syncStatusNoNewHead + // syncStatusNoNewFinalizedHead - RPC failed to produce a new finalized head for too long + syncStatusNoNewFinalizedHead + syncStatusLen +) + +func (s syncStatus) String() string { + if s == syncStatusSynced { + return "Synced" + } + var result bytes.Buffer + for i := syncStatusNotInSyncWithPool; i < syncStatusLen; i = i << 1 { + if i&s == 0 { + continue + } + result.WriteString(i.string()) + result.WriteString(",") + } + result.Truncate(result.Len() - 1) + return result.String() +} + +func (s syncStatus) string() string { + switch s { + case syncStatusNotInSyncWithPool: + return "NotInSyncWithRPCPool" + case syncStatusNoNewHead: + return "NoNewHead" + case syncStatusNoNewFinalizedHead: + return "NoNewFinalizedHead" + default: + return fmt.Sprintf("syncStatus(%d)", s) + } +} diff --git a/pkg/solana/client/multinode/multi_node.go b/pkg/solana/client/multinode/multi_node.go new file mode 100644 index 000000000..1a4846edf --- /dev/null +++ b/pkg/solana/client/multinode/multi_node.go @@ -0,0 +1,379 @@ +package client + +import ( + "context" + "errors" + "fmt" + "math/big" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +var ( + // PromMultiNodeRPCNodeStates reports current RPC node state + PromMultiNodeRPCNodeStates = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "solana_multi_node_states", + Help: "The number of RPC nodes currently in the given state for the given chain", + }, []string{"network", "chainId", "state"}) + ErroringNodeError = fmt.Errorf("no live nodes available") +) + +// MultiNode is a generalized multi node client interface that includes methods to interact with different chains. +// It also handles multiple node RPC connections simultaneously. +type MultiNode[ + CHAIN_ID ID, + RPC any, +] struct { + services.StateMachine + primaryNodes []Node[CHAIN_ID, RPC] + sendOnlyNodes []SendOnlyNode[CHAIN_ID, RPC] + chainID CHAIN_ID + lggr logger.SugaredLogger + selectionMode string + nodeSelector NodeSelector[CHAIN_ID, RPC] + leaseDuration time.Duration + leaseTicker *time.Ticker + chainFamily string + reportInterval time.Duration + deathDeclarationDelay time.Duration + + activeMu sync.RWMutex + activeNode Node[CHAIN_ID, RPC] + + chStop services.StopChan + wg sync.WaitGroup +} + +func NewMultiNode[ + CHAIN_ID ID, + RPC any, +]( + lggr logger.Logger, + selectionMode string, // type of the "best" RPC selector (e.g HighestHead, RoundRobin, etc.) + leaseDuration time.Duration, // defines interval on which new "best" RPC should be selected + primaryNodes []Node[CHAIN_ID, RPC], + sendOnlyNodes []SendOnlyNode[CHAIN_ID, RPC], + chainID CHAIN_ID, // configured chain ID (used to verify that passed primaryNodes belong to the same chain) + chainFamily string, // name of the chain family - used in the metrics + deathDeclarationDelay time.Duration, +) *MultiNode[CHAIN_ID, RPC] { + nodeSelector := newNodeSelector(selectionMode, primaryNodes) + // Prometheus' default interval is 15s, set this to under 7.5s to avoid + // aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency) + const reportInterval = 6500 * time.Millisecond + c := &MultiNode[CHAIN_ID, RPC]{ + primaryNodes: primaryNodes, + sendOnlyNodes: sendOnlyNodes, + chainID: chainID, + lggr: logger.Sugared(lggr).Named("MultiNode").With("chainID", chainID.String()), + selectionMode: selectionMode, + nodeSelector: nodeSelector, + chStop: make(services.StopChan), + leaseDuration: leaseDuration, + chainFamily: chainFamily, + reportInterval: reportInterval, + deathDeclarationDelay: deathDeclarationDelay, + } + + c.lggr.Debugf("The MultiNode is configured to use NodeSelectionMode: %s", selectionMode) + + return c +} + +func (c *MultiNode[CHAIN_ID, RPC]) ChainID() CHAIN_ID { + return c.chainID +} + +func (c *MultiNode[CHAIN_ID, RPC]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC, isSendOnly bool)) error { + var err error + ok := c.IfNotStopped(func() { + ctx, _ = c.chStop.Ctx(ctx) + + callsCompleted := 0 + for _, n := range c.primaryNodes { + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + if n.State() != NodeStateAlive { + continue + } + do(ctx, n.RPC(), false) + callsCompleted++ + } + } + if callsCompleted == 0 { + err = ErroringNodeError + } + + for _, n := range c.sendOnlyNodes { + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + if n.State() != NodeStateAlive { + continue + } + do(ctx, n.RPC(), true) + } + } + }) + if !ok { + return errors.New("MultiNode is stopped") + } + return err +} + +func (c *MultiNode[CHAIN_ID, RPC]) NodeStates() map[string]NodeState { + states := map[string]NodeState{} + for _, n := range c.primaryNodes { + states[n.String()] = n.State() + } + for _, n := range c.sendOnlyNodes { + states[n.String()] = n.State() + } + return states +} + +// Start starts every node in the pool +// +// Nodes handle their own redialing and runloops, so this function does not +// return any error if the nodes aren't available +func (c *MultiNode[CHAIN_ID, RPC]) Start(ctx context.Context) error { + return c.StartOnce("MultiNode", func() (merr error) { + if len(c.primaryNodes) == 0 { + return fmt.Errorf("no available nodes for chain %s", c.chainID.String()) + } + var ms services.MultiStart + for _, n := range c.primaryNodes { + if n.ConfiguredChainID().String() != c.chainID.String() { + return ms.CloseBecause(fmt.Errorf("node %s has configured chain ID %s which does not match multinode configured chain ID of %s", n.String(), n.ConfiguredChainID().String(), c.chainID.String())) + } + n.SetPoolChainInfoProvider(c) + // node will handle its own redialing and automatic recovery + if err := ms.Start(ctx, n); err != nil { + return err + } + } + for _, s := range c.sendOnlyNodes { + if s.ConfiguredChainID().String() != c.chainID.String() { + return ms.CloseBecause(fmt.Errorf("sendonly node %s has configured chain ID %s which does not match multinode configured chain ID of %s", s.String(), s.ConfiguredChainID().String(), c.chainID.String())) + } + if err := ms.Start(ctx, s); err != nil { + return err + } + } + c.wg.Add(1) + go c.runLoop() + + if c.leaseDuration.Seconds() > 0 && c.selectionMode != NodeSelectionModeRoundRobin { + c.lggr.Infof("The MultiNode will switch to best node every %s", c.leaseDuration.String()) + c.wg.Add(1) + go c.checkLeaseLoop() + } else { + c.lggr.Info("Best node switching is disabled") + } + + return nil + }) +} + +// Close tears down the MultiNode and closes all nodes +func (c *MultiNode[CHAIN_ID, RPC]) Close() error { + return c.StopOnce("MultiNode", func() error { + close(c.chStop) + c.wg.Wait() + + return services.CloseAll(services.MultiCloser(c.primaryNodes), services.MultiCloser(c.sendOnlyNodes)) + }) +} + +// SelectRPC returns an RPC of an active node. If there are no active nodes it returns an error. +// Call this method from your chain-specific client implementation to access any chain-specific rpc calls. +func (c *MultiNode[CHAIN_ID, RPC]) SelectRPC() (rpc RPC, err error) { + n, err := c.selectNode() + if err != nil { + return rpc, err + } + return n.RPC(), nil +} + +// selectNode returns the active Node, if it is still NodeStateAlive, otherwise it selects a new one from the NodeSelector. +func (c *MultiNode[CHAIN_ID, RPC]) selectNode() (node Node[CHAIN_ID, RPC], err error) { + c.activeMu.RLock() + node = c.activeNode + c.activeMu.RUnlock() + if node != nil && node.State() == NodeStateAlive { + return // still alive + } + + // select a new one + c.activeMu.Lock() + defer c.activeMu.Unlock() + node = c.activeNode + if node != nil && node.State() == NodeStateAlive { + return // another goroutine beat us here + } + + if c.activeNode != nil { + c.activeNode.UnsubscribeAllExceptAliveLoop() + } + c.activeNode = c.nodeSelector.Select() + + if c.activeNode == nil { + c.lggr.Criticalw("No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name()) + errmsg := fmt.Errorf("no live nodes available for chain %s", c.chainID.String()) + c.SvcErrBuffer.Append(errmsg) + err = ErroringNodeError + } + + return c.activeNode, err +} + +// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync. +// Return highest ChainInfo most recently received by the alive nodes. +// E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12. +func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo() (int, ChainInfo) { + var nLiveNodes int + ch := ChainInfo{ + TotalDifficulty: big.NewInt(0), + } + for _, n := range c.primaryNodes { + if s, nodeChainInfo := n.StateAndLatest(); s == NodeStateAlive { + nLiveNodes++ + ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber) + ch.FinalizedBlockNumber = max(ch.FinalizedBlockNumber, nodeChainInfo.FinalizedBlockNumber) + ch.TotalDifficulty = MaxTotalDifficulty(ch.TotalDifficulty, nodeChainInfo.TotalDifficulty) + } + } + return nLiveNodes, ch +} + +// HighestUserObservations - returns highest ChainInfo ever observed by any user of the MultiNode +func (c *MultiNode[CHAIN_ID, RPC]) HighestUserObservations() ChainInfo { + ch := ChainInfo{ + TotalDifficulty: big.NewInt(0), + } + for _, n := range c.primaryNodes { + nodeChainInfo := n.HighestUserObservations() + ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber) + ch.FinalizedBlockNumber = max(ch.FinalizedBlockNumber, nodeChainInfo.FinalizedBlockNumber) + ch.TotalDifficulty = MaxTotalDifficulty(ch.TotalDifficulty, nodeChainInfo.TotalDifficulty) + } + return ch +} + +func (c *MultiNode[CHAIN_ID, RPC]) checkLease() { + bestNode := c.nodeSelector.Select() + for _, n := range c.primaryNodes { + // Terminate client subscriptions. Services are responsible for reconnecting, which will be routed to the new + // best node. Only terminate connections with more than 1 subscription to account for the aliveLoop subscription + if n.State() == NodeStateAlive && n != bestNode { + c.lggr.Infof("Switching to best node from %q to %q", n.String(), bestNode.String()) + n.UnsubscribeAllExceptAliveLoop() + } + } + + c.activeMu.Lock() + defer c.activeMu.Unlock() + if bestNode != c.activeNode { + if c.activeNode != nil { + c.activeNode.UnsubscribeAllExceptAliveLoop() + } + c.activeNode = bestNode + } +} + +func (c *MultiNode[CHAIN_ID, RPC]) checkLeaseLoop() { + defer c.wg.Done() + c.leaseTicker = time.NewTicker(c.leaseDuration) + defer c.leaseTicker.Stop() + + for { + select { + case <-c.leaseTicker.C: + c.checkLease() + case <-c.chStop: + return + } + } +} + +func (c *MultiNode[CHAIN_ID, RPC]) runLoop() { + defer c.wg.Done() + + nodeStates := make([]nodeWithState, len(c.primaryNodes)) + for i, n := range c.primaryNodes { + nodeStates[i] = nodeWithState{ + Node: n.String(), + State: n.State().String(), + DeadSince: nil, + } + } + + c.report(nodeStates) + + monitor := services.NewTicker(c.reportInterval) + defer monitor.Stop() + + for { + select { + case <-monitor.C: + c.report(nodeStates) + case <-c.chStop: + return + } + } +} + +type nodeWithState struct { + Node string + State string + DeadSince *time.Time +} + +func (c *MultiNode[CHAIN_ID, RPC]) report(nodesStateInfo []nodeWithState) { + start := time.Now() + var dead int + counts := make(map[NodeState]int) + for i, n := range c.primaryNodes { + state := n.State() + counts[state]++ + nodesStateInfo[i].State = state.String() + if state == NodeStateAlive { + nodesStateInfo[i].DeadSince = nil + continue + } + + if nodesStateInfo[i].DeadSince == nil { + nodesStateInfo[i].DeadSince = &start + } + + if start.Sub(*nodesStateInfo[i].DeadSince) >= c.deathDeclarationDelay { + dead++ + } + } + for _, state := range allNodeStates { + count := counts[state] + PromMultiNodeRPCNodeStates.WithLabelValues(c.chainFamily, c.chainID.String(), state.String()).Set(float64(count)) + } + + total := len(c.primaryNodes) + live := total - dead + c.lggr.Tracew(fmt.Sprintf("MultiNode state: %d/%d nodes are alive", live, total), "nodeStates", nodesStateInfo) + if total == dead { + rerr := fmt.Errorf("no primary nodes available: 0/%d nodes are alive", total) + c.lggr.Criticalw(rerr.Error(), "nodeStates", nodesStateInfo) + c.SvcErrBuffer.Append(rerr) + } else if dead > 0 { + c.lggr.Errorw(fmt.Sprintf("At least one primary node is dead: %d/%d nodes are alive", live, total), "nodeStates", nodesStateInfo) + } +} diff --git a/pkg/solana/client/multinode/node.go b/pkg/solana/client/multinode/node.go new file mode 100644 index 000000000..afdece741 --- /dev/null +++ b/pkg/solana/client/multinode/node.go @@ -0,0 +1,328 @@ +package client + +import ( + "context" + "errors" + "fmt" + "net/url" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +const QueryTimeout = 10 * time.Second + +var errInvalidChainID = errors.New("invalid chain id") + +var ( + promPoolRPCNodeVerifies = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_verifies", + Help: "The total number of chain ID verifications for the given RPC node", + }, []string{"network", "chainID", "nodeName"}) + promPoolRPCNodeVerifiesFailed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_verifies_failed", + Help: "The total number of failed chain ID verifications for the given RPC node", + }, []string{"network", "chainID", "nodeName"}) + promPoolRPCNodeVerifiesSuccess = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_verifies_success", + Help: "The total number of successful chain ID verifications for the given RPC node", + }, []string{"network", "chainID", "nodeName"}) +) + +type NodeConfig interface { + PollFailureThreshold() uint32 + PollInterval() time.Duration + SelectionMode() string + SyncThreshold() uint32 + NodeIsSyncingEnabled() bool + FinalizedBlockPollInterval() time.Duration + EnforceRepeatableRead() bool + DeathDeclarationDelay() time.Duration +} + +type ChainConfig interface { + NodeNoNewHeadsThreshold() time.Duration + NoNewFinalizedHeadsThreshold() time.Duration + FinalityDepth() uint32 + FinalityTagEnabled() bool + FinalizedBlockOffset() uint32 +} + +type Node[ + CHAIN_ID ID, + RPC any, +] interface { + // State returns most accurate state of the Node on the moment of call. + // While some of the checks may be performed in the background and State may return cached value, critical, like + // `FinalizedBlockOutOfSync`, must be executed upon every call. + State() NodeState + // StateAndLatest returns nodeState with the latest ChainInfo observed by Node during current lifecycle. + StateAndLatest() (NodeState, ChainInfo) + // HighestUserObservations - returns highest ChainInfo ever observed by underlying RPC excluding results of health check requests + HighestUserObservations() ChainInfo + SetPoolChainInfoProvider(PoolChainInfoProvider) + // Name is a unique identifier for this node. + Name() string + // String - returns string representation of the node, useful for debugging (name + URLS used to connect to the RPC) + String() string + RPC() RPC + // UnsubscribeAllExceptAliveLoop - closes all subscriptions except the aliveLoop subscription + UnsubscribeAllExceptAliveLoop() + ConfiguredChainID() CHAIN_ID + // Order - returns priority order configured for the RPC + Order() int32 + // Start - starts health checks + Start(context.Context) error + Close() error +} + +type node[ + CHAIN_ID ID, + HEAD Head, + RPC RPCClient[CHAIN_ID, HEAD], +] struct { + services.StateMachine + lfcLog logger.Logger + name string + id int + chainID CHAIN_ID + nodePoolCfg NodeConfig + chainCfg ChainConfig + order int32 + chainFamily string + + ws url.URL + http *url.URL + + rpc RPC + + stateMu sync.RWMutex // protects state* fields + state NodeState + + poolInfoProvider PoolChainInfoProvider + + stopCh services.StopChan + // wg waits for subsidiary goroutines + wg sync.WaitGroup + + healthCheckSubs []Subscription +} + +func NewNode[ + CHAIN_ID ID, + HEAD Head, + RPC RPCClient[CHAIN_ID, HEAD], +]( + nodeCfg NodeConfig, + chainCfg ChainConfig, + lggr logger.Logger, + wsuri url.URL, + httpuri *url.URL, + name string, + id int, + chainID CHAIN_ID, + nodeOrder int32, + rpc RPC, + chainFamily string, +) Node[CHAIN_ID, RPC] { + n := new(node[CHAIN_ID, HEAD, RPC]) + n.name = name + n.id = id + n.chainID = chainID + n.nodePoolCfg = nodeCfg + n.chainCfg = chainCfg + n.ws = wsuri + n.order = nodeOrder + if httpuri != nil { + n.http = httpuri + } + n.stopCh = make(services.StopChan) + lggr = logger.Named(lggr, "Node") + lggr = logger.With(lggr, + "nodeTier", Primary.String(), + "nodeName", name, + "node", n.String(), + "chainID", chainID, + "nodeOrder", n.order, + ) + n.lfcLog = logger.Named(lggr, "Lifecycle") + n.rpc = rpc + n.chainFamily = chainFamily + return n +} + +func (n *node[CHAIN_ID, HEAD, RPC]) String() string { + s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String()) + if n.http != nil { + s = s + fmt.Sprintf(":%s", n.http.String()) + } + return s +} + +func (n *node[CHAIN_ID, HEAD, RPC]) ConfiguredChainID() (chainID CHAIN_ID) { + return n.chainID +} + +func (n *node[CHAIN_ID, HEAD, RPC]) Name() string { + return n.name +} + +func (n *node[CHAIN_ID, HEAD, RPC]) RPC() RPC { + return n.rpc +} + +// unsubscribeAllExceptAliveLoop is not thread-safe; it should only be called +// while holding the stateMu lock. +func (n *node[CHAIN_ID, HEAD, RPC]) unsubscribeAllExceptAliveLoop() { + n.rpc.UnsubscribeAllExcept(n.healthCheckSubs...) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) UnsubscribeAllExceptAliveLoop() { + n.stateMu.Lock() + defer n.stateMu.Unlock() + n.unsubscribeAllExceptAliveLoop() +} + +func (n *node[CHAIN_ID, HEAD, RPC]) Close() error { + return n.StopOnce(n.name, n.close) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) close() error { + defer func() { + n.wg.Wait() + n.rpc.Close() + }() + + n.stateMu.Lock() + defer n.stateMu.Unlock() + + close(n.stopCh) + n.state = NodeStateClosed + return nil +} + +// Start dials and verifies the node +// Should only be called once in a node's lifecycle +// Return value is necessary to conform to interface but this will never +// actually return an error. +func (n *node[CHAIN_ID, HEAD, RPC]) Start(startCtx context.Context) error { + return n.StartOnce(n.name, func() error { + n.start(startCtx) + return nil + }) +} + +// start initially dials the node and verifies chain ID +// This spins off lifecycle goroutines. +// Not thread-safe. +// Node lifecycle is synchronous: only one goroutine should be running at a +// time. +func (n *node[CHAIN_ID, HEAD, RPC]) start(startCtx context.Context) { + if n.state != NodeStateUndialed { + panic(fmt.Sprintf("cannot dial node with state %v", n.state)) + } + + if err := n.rpc.Dial(startCtx); err != nil { + n.lfcLog.Errorw("Dial failed: Node is unreachable", "err", err) + n.declareUnreachable() + return + } + n.setState(NodeStateDialed) + + state := n.verifyConn(startCtx, n.lfcLog) + n.declareState(state) +} + +// verifyChainID checks that connection to the node matches the given chain ID +// Not thread-safe +// Pure verifyChainID: does not mutate node "state" field. +func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lggr logger.Logger) NodeState { + promPoolRPCNodeVerifies.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc() + promFailed := func() { + promPoolRPCNodeVerifiesFailed.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc() + } + + st := n.getCachedState() + switch st { + case NodeStateClosed: + // The node is already closed, and any subsequent transition is invalid. + // To make spotting such transitions a bit easier, return the invalid node state. + return NodeStateLen + case NodeStateDialed, NodeStateOutOfSync, NodeStateInvalidChainID, NodeStateSyncing: + default: + panic(fmt.Sprintf("cannot verify node in state %v", st)) + } + + var chainID CHAIN_ID + var err error + if chainID, err = n.rpc.ChainID(callerCtx); err != nil { + promFailed() + lggr.Errorw("Failed to verify chain ID for node", "err", err, "nodeState", n.getCachedState()) + return NodeStateUnreachable + } else if chainID.String() != n.chainID.String() { + promFailed() + err = fmt.Errorf( + "rpc ChainID doesn't match local chain ID: RPC ID=%s, local ID=%s, node name=%s: %w", + chainID.String(), + n.chainID.String(), + n.name, + errInvalidChainID, + ) + lggr.Errorw("Failed to verify RPC node; remote endpoint returned the wrong chain ID", "err", err, "nodeState", n.getCachedState()) + return NodeStateInvalidChainID + } + + promPoolRPCNodeVerifiesSuccess.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc() + + return NodeStateAlive +} + +// createVerifiedConn - establishes new connection with the RPC and verifies that it's valid: chainID matches, and it's not syncing. +// Returns desired state if one of the verifications fails. Otherwise, returns NodeStateAlive. +func (n *node[CHAIN_ID, HEAD, RPC]) createVerifiedConn(ctx context.Context, lggr logger.Logger) NodeState { + if err := n.rpc.Dial(ctx); err != nil { + n.lfcLog.Errorw("Dial failed: Node is unreachable", "err", err, "nodeState", n.getCachedState()) + return NodeStateUnreachable + } + + return n.verifyConn(ctx, lggr) +} + +// verifyConn - verifies that current connection is valid: chainID matches, and it's not syncing. +// Returns desired state if one of the verifications fails. Otherwise, returns NodeStateAlive. +func (n *node[CHAIN_ID, HEAD, RPC]) verifyConn(ctx context.Context, lggr logger.Logger) NodeState { + state := n.verifyChainID(ctx, lggr) + if state != NodeStateAlive { + return state + } + + if n.nodePoolCfg.NodeIsSyncingEnabled() { + isSyncing, err := n.rpc.IsSyncing(ctx) + if err != nil { + lggr.Errorw("Unexpected error while verifying RPC node synchronization status", "err", err, "nodeState", n.getCachedState()) + return NodeStateUnreachable + } + + if isSyncing { + lggr.Errorw("Verification failed: Node is syncing", "nodeState", n.getCachedState()) + return NodeStateSyncing + } + } + + return NodeStateAlive +} + +func (n *node[CHAIN_ID, HEAD, RPC]) Order() int32 { + return n.order +} + +func (n *node[CHAIN_ID, HEAD, RPC]) newCtx() (context.Context, context.CancelFunc) { + ctx, cancel := n.stopCh.NewCtx() + ctx = CtxAddHealthCheckFlag(ctx) + return ctx, cancel +} diff --git a/pkg/solana/client/multinode/node_fsm.go b/pkg/solana/client/multinode/node_fsm.go new file mode 100644 index 000000000..136910868 --- /dev/null +++ b/pkg/solana/client/multinode/node_fsm.go @@ -0,0 +1,370 @@ +package client + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + promPoolRPCNodeTransitionsToAlive = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_num_transitions_to_alive", + Help: transitionString(NodeStateAlive), + }, []string{"chainID", "nodeName"}) + promPoolRPCNodeTransitionsToInSync = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_num_transitions_to_in_sync", + Help: fmt.Sprintf("%s to %s", transitionString(NodeStateOutOfSync), NodeStateAlive), + }, []string{"chainID", "nodeName"}) + promPoolRPCNodeTransitionsToOutOfSync = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_num_transitions_to_out_of_sync", + Help: transitionString(NodeStateOutOfSync), + }, []string{"chainID", "nodeName"}) + promPoolRPCNodeTransitionsToUnreachable = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_num_transitions_to_unreachable", + Help: transitionString(NodeStateUnreachable), + }, []string{"chainID", "nodeName"}) + promPoolRPCNodeTransitionsToInvalidChainID = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_num_transitions_to_invalid_chain_id", + Help: transitionString(NodeStateInvalidChainID), + }, []string{"chainID", "nodeName"}) + promPoolRPCNodeTransitionsToUnusable = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_num_transitions_to_unusable", + Help: transitionString(NodeStateUnusable), + }, []string{"chainID", "nodeName"}) + promPoolRPCNodeTransitionsToSyncing = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_num_transitions_to_syncing", + Help: transitionString(NodeStateSyncing), + }, []string{"chainID", "nodeName"}) +) + +// NodeState represents the current state of the node +// Node is a FSM (finite state machine) +type NodeState int + +func (n NodeState) String() string { + switch n { + case NodeStateUndialed: + return "Undialed" + case NodeStateDialed: + return "Dialed" + case NodeStateInvalidChainID: + return "InvalidChainID" + case NodeStateAlive: + return "Alive" + case NodeStateUnreachable: + return "Unreachable" + case NodeStateUnusable: + return "Unusable" + case NodeStateOutOfSync: + return "OutOfSync" + case NodeStateClosed: + return "Closed" + case NodeStateSyncing: + return "Syncing" + case NodeStateFinalizedBlockOutOfSync: + return "FinalizedBlockOutOfSync" + default: + return fmt.Sprintf("NodeState(%d)", n) + } +} + +// GoString prints a prettier state +func (n NodeState) GoString() string { + return fmt.Sprintf("NodeState%s(%d)", n.String(), n) +} + +const ( + // NodeStateUndialed is the first state of a virgin node + NodeStateUndialed = NodeState(iota) + // NodeStateDialed is after a node has successfully dialed but before it has verified the correct chain ID + NodeStateDialed + // NodeStateInvalidChainID is after chain ID verification failed + NodeStateInvalidChainID + // NodeStateAlive is a healthy node after chain ID verification succeeded + NodeStateAlive + // NodeStateUnreachable is a node that cannot be dialed or has disconnected + NodeStateUnreachable + // NodeStateOutOfSync is a node that is accepting connections but exceeded + // the failure threshold without sending any new heads. It will be + // disconnected, then put into a revive loop and re-awakened after redial + // if a new head arrives + NodeStateOutOfSync + // NodeStateUnusable is a sendonly node that has an invalid URL that can never be reached + NodeStateUnusable + // NodeStateClosed is after the connection has been closed and the node is at the end of its lifecycle + NodeStateClosed + // NodeStateSyncing is a node that is actively back-filling blockchain. Usually, it's a newly set up node that is + // still syncing the chain. The main difference from `NodeStateOutOfSync` is that it represents state relative + // to other primary nodes configured in the MultiNode. In contrast, `NodeStateSyncing` represents the internal state of + // the node (RPC). + NodeStateSyncing + // nodeStateFinalizedBlockOutOfSync - node is lagging behind on latest finalized block + NodeStateFinalizedBlockOutOfSync + // nodeStateLen tracks the number of states + NodeStateLen +) + +// allNodeStates represents all possible states a node can be in +var allNodeStates []NodeState + +func init() { + for s := NodeState(0); s < NodeStateLen; s++ { + allNodeStates = append(allNodeStates, s) + } +} + +// FSM methods + +// State allows reading the current state of the node. +func (n *node[CHAIN_ID, HEAD, RPC]) State() NodeState { + n.stateMu.RLock() + defer n.stateMu.RUnlock() + return n.recalculateState() +} + +func (n *node[CHAIN_ID, HEAD, RPC]) getCachedState() NodeState { + n.stateMu.RLock() + defer n.stateMu.RUnlock() + return n.state +} + +func (n *node[CHAIN_ID, HEAD, RPC]) recalculateState() NodeState { + if n.state != NodeStateAlive { + return n.state + } + + // double check that node is not lagging on finalized block + if n.nodePoolCfg.EnforceRepeatableRead() && n.isFinalizedBlockOutOfSync() { + return NodeStateFinalizedBlockOutOfSync + } + + return NodeStateAlive +} + +func (n *node[CHAIN_ID, HEAD, RPC]) isFinalizedBlockOutOfSync() bool { + if n.poolInfoProvider == nil { + return false + } + + highestObservedByCaller := n.poolInfoProvider.HighestUserObservations() + latest, _ := n.rpc.GetInterceptedChainInfo() + if n.chainCfg.FinalityTagEnabled() { + return latest.FinalizedBlockNumber < highestObservedByCaller.FinalizedBlockNumber-int64(n.chainCfg.FinalizedBlockOffset()) + } + + return latest.BlockNumber < highestObservedByCaller.BlockNumber-int64(n.chainCfg.FinalizedBlockOffset()) +} + +// StateAndLatest returns nodeState with the latest ChainInfo observed by Node during current lifecycle. +func (n *node[CHAIN_ID, HEAD, RPC]) StateAndLatest() (NodeState, ChainInfo) { + n.stateMu.RLock() + defer n.stateMu.RUnlock() + latest, _ := n.rpc.GetInterceptedChainInfo() + return n.recalculateState(), latest +} + +// HighestUserObservations - returns highest ChainInfo ever observed by external user of the Node +func (n *node[CHAIN_ID, HEAD, RPC]) HighestUserObservations() ChainInfo { + _, highestUserObservations := n.rpc.GetInterceptedChainInfo() + return highestUserObservations +} +func (n *node[CHAIN_ID, HEAD, RPC]) SetPoolChainInfoProvider(poolInfoProvider PoolChainInfoProvider) { + n.poolInfoProvider = poolInfoProvider +} + +// setState is only used by internal state management methods. +// This is low-level; care should be taken by the caller to ensure the new state is a valid transition. +// State changes should always be synchronous: only one goroutine at a time should change state. +// n.stateMu should not be locked for long periods of time because external clients expect a timely response from n.State() +func (n *node[CHAIN_ID, HEAD, RPC]) setState(s NodeState) { + n.stateMu.Lock() + defer n.stateMu.Unlock() + n.state = s +} + +// declareXXX methods change the state and pass conrol off the new state +// management goroutine + +func (n *node[CHAIN_ID, HEAD, RPC]) declareAlive() { + n.transitionToAlive(func() { + n.lfcLog.Infow("RPC Node is online", "nodeState", n.state) + n.wg.Add(1) + go n.aliveLoop() + }) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) transitionToAlive(fn func()) { + promPoolRPCNodeTransitionsToAlive.WithLabelValues(n.chainID.String(), n.name).Inc() + n.stateMu.Lock() + defer n.stateMu.Unlock() + if n.state == NodeStateClosed { + return + } + switch n.state { + case NodeStateDialed, NodeStateInvalidChainID, NodeStateSyncing: + n.state = NodeStateAlive + default: + panic(transitionFail(n.state, NodeStateAlive)) + } + fn() +} + +// declareInSync puts a node back into Alive state, allowing it to be used by +// pool consumers again +func (n *node[CHAIN_ID, HEAD, RPC]) declareInSync() { + n.transitionToInSync(func() { + n.lfcLog.Infow("RPC Node is back in sync", "nodeState", n.state) + n.wg.Add(1) + go n.aliveLoop() + }) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInSync(fn func()) { + promPoolRPCNodeTransitionsToAlive.WithLabelValues(n.chainID.String(), n.name).Inc() + promPoolRPCNodeTransitionsToInSync.WithLabelValues(n.chainID.String(), n.name).Inc() + n.stateMu.Lock() + defer n.stateMu.Unlock() + if n.state == NodeStateClosed { + return + } + switch n.state { + case NodeStateOutOfSync, NodeStateSyncing: + n.state = NodeStateAlive + default: + panic(transitionFail(n.state, NodeStateAlive)) + } + fn() +} + +// declareOutOfSync puts a node into OutOfSync state, disconnecting all current +// clients and making it unavailable for use until back in-sync. +func (n *node[CHAIN_ID, HEAD, RPC]) declareOutOfSync(syncIssues syncStatus) { + n.transitionToOutOfSync(func() { + n.lfcLog.Errorw("RPC Node is out of sync", "nodeState", n.state, "syncIssues", syncIssues) + n.wg.Add(1) + go n.outOfSyncLoop(syncIssues) + }) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) transitionToOutOfSync(fn func()) { + promPoolRPCNodeTransitionsToOutOfSync.WithLabelValues(n.chainID.String(), n.name).Inc() + n.stateMu.Lock() + defer n.stateMu.Unlock() + if n.state == NodeStateClosed { + return + } + switch n.state { + case NodeStateAlive: + n.rpc.Close() + n.state = NodeStateOutOfSync + default: + panic(transitionFail(n.state, NodeStateOutOfSync)) + } + fn() +} + +func (n *node[CHAIN_ID, HEAD, RPC]) declareUnreachable() { + n.transitionToUnreachable(func() { + n.lfcLog.Errorw("RPC Node is unreachable", "nodeState", n.state) + n.wg.Add(1) + go n.unreachableLoop() + }) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) transitionToUnreachable(fn func()) { + promPoolRPCNodeTransitionsToUnreachable.WithLabelValues(n.chainID.String(), n.name).Inc() + n.stateMu.Lock() + defer n.stateMu.Unlock() + if n.state == NodeStateClosed { + return + } + switch n.state { + case NodeStateUndialed, NodeStateDialed, NodeStateAlive, NodeStateOutOfSync, NodeStateInvalidChainID, NodeStateSyncing: + n.rpc.Close() + n.state = NodeStateUnreachable + default: + panic(transitionFail(n.state, NodeStateUnreachable)) + } + fn() +} + +func (n *node[CHAIN_ID, HEAD, RPC]) declareState(state NodeState) { + if n.getCachedState() == NodeStateClosed { + return + } + switch state { + case NodeStateInvalidChainID: + n.declareInvalidChainID() + case NodeStateUnreachable: + n.declareUnreachable() + case NodeStateSyncing: + n.declareSyncing() + case NodeStateAlive: + n.declareAlive() + default: + panic(fmt.Sprintf("%#v state declaration is not implemented", state)) + } +} + +func (n *node[CHAIN_ID, HEAD, RPC]) declareInvalidChainID() { + n.transitionToInvalidChainID(func() { + n.lfcLog.Errorw("RPC Node has the wrong chain ID", "nodeState", n.state) + n.wg.Add(1) + go n.invalidChainIDLoop() + }) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInvalidChainID(fn func()) { + promPoolRPCNodeTransitionsToInvalidChainID.WithLabelValues(n.chainID.String(), n.name).Inc() + n.stateMu.Lock() + defer n.stateMu.Unlock() + if n.state == NodeStateClosed { + return + } + switch n.state { + case NodeStateDialed, NodeStateOutOfSync, NodeStateSyncing: + n.rpc.Close() + n.state = NodeStateInvalidChainID + default: + panic(transitionFail(n.state, NodeStateInvalidChainID)) + } + fn() +} + +func (n *node[CHAIN_ID, HEAD, RPC]) declareSyncing() { + n.transitionToSyncing(func() { + n.lfcLog.Errorw("RPC Node is syncing", "nodeState", n.state) + n.wg.Add(1) + go n.syncingLoop() + }) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) { + promPoolRPCNodeTransitionsToSyncing.WithLabelValues(n.chainID.String(), n.name).Inc() + n.stateMu.Lock() + defer n.stateMu.Unlock() + if n.state == NodeStateClosed { + return + } + switch n.state { + case NodeStateDialed, NodeStateOutOfSync, NodeStateInvalidChainID: + n.rpc.Close() + n.state = NodeStateSyncing + default: + panic(transitionFail(n.state, NodeStateSyncing)) + } + + if !n.nodePoolCfg.NodeIsSyncingEnabled() { + panic("unexpected transition to NodeStateSyncing, while it's disabled") + } + fn() +} + +func transitionString(state NodeState) string { + return fmt.Sprintf("Total number of times node has transitioned to %s", state) +} + +func transitionFail(from NodeState, to NodeState) string { + return fmt.Sprintf("cannot transition from %#v to %#v", from, to) +} diff --git a/pkg/solana/client/multinode/node_lifecycle.go b/pkg/solana/client/multinode/node_lifecycle.go new file mode 100644 index 000000000..d6b150690 --- /dev/null +++ b/pkg/solana/client/multinode/node_lifecycle.go @@ -0,0 +1,687 @@ +package client + +import ( + "context" + "fmt" + "math" + "math/big" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils" + bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math" +) + +var ( + promPoolRPCNodeHighestSeenBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "solana_pool_rpc_node_highest_seen_block", + Help: "The highest seen block for the given RPC node", + }, []string{"chainID", "nodeName"}) + promPoolRPCNodeHighestFinalizedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "solana_pool_rpc_node_highest_finalized_block", + Help: "The highest seen finalized block for the given RPC node", + }, []string{"chainID", "nodeName"}) + promPoolRPCNodeNumSeenBlocks = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_num_seen_blocks", + Help: "The total number of new blocks seen by the given RPC node", + }, []string{"chainID", "nodeName"}) + promPoolRPCNodePolls = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_polls_total", + Help: "The total number of poll checks for the given RPC node", + }, []string{"chainID", "nodeName"}) + promPoolRPCNodePollsFailed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_polls_failed", + Help: "The total number of failed poll checks for the given RPC node", + }, []string{"chainID", "nodeName"}) + promPoolRPCNodePollsSuccess = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_pool_rpc_node_polls_success", + Help: "The total number of successful poll checks for the given RPC node", + }, []string{"chainID", "nodeName"}) +) + +// zombieNodeCheckInterval controls how often to re-check to see if we need to +// state change in case we have to force a state transition due to no available +// nodes. +// NOTE: This only applies to out-of-sync nodes if they are the last available node +func zombieNodeCheckInterval(noNewHeadsThreshold time.Duration) time.Duration { + interval := noNewHeadsThreshold + if interval <= 0 || interval > QueryTimeout { + interval = QueryTimeout + } + return utils.WithJitter(interval) +} + +const ( + msgCannotDisable = "but cannot disable this connection because there are no other RPC endpoints, or all other RPC endpoints are dead." + msgDegradedState = "Chainlink is now operating in a degraded state and urgent action is required to resolve the issue" +) + +// Node is a FSM +// Each state has a loop that goes with it, which monitors the node and moves it into another state as necessary. +// Only one loop must run at a time. +// Each loop passes control onto the next loop as it exits, except when the node is Closed which terminates the loop permanently. + +// This handles node lifecycle for the ALIVE state +// Should only be run ONCE per node, after a successful Dial +func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { + defer n.wg.Done() + ctx, cancel := n.newCtx() + defer cancel() + + { + // sanity check + state := n.getCachedState() + switch state { + case NodeStateAlive: + case NodeStateClosed: + return + default: + panic(fmt.Sprintf("aliveLoop can only run for node in Alive state, got: %s", state)) + } + } + + noNewHeadsTimeoutThreshold := n.chainCfg.NodeNoNewHeadsThreshold() + noNewFinalizedBlocksTimeoutThreshold := n.chainCfg.NoNewFinalizedHeadsThreshold() + pollFailureThreshold := n.nodePoolCfg.PollFailureThreshold() + pollInterval := n.nodePoolCfg.PollInterval() + + lggr := logger.Sugared(n.lfcLog).Named("Alive").With("noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold, "pollInterval", pollInterval, "pollFailureThreshold", pollFailureThreshold) + lggr.Tracew("Alive loop starting", "nodeState", n.getCachedState()) + + headsSub, err := n.registerNewSubscription(ctx, lggr.With("subscriptionType", "heads"), + n.chainCfg.NodeNoNewHeadsThreshold(), n.rpc.SubscribeToHeads) + if err != nil { + lggr.Errorw("Initial subscribe for heads failed", "nodeState", n.getCachedState(), "err", err) + n.declareUnreachable() + return + } + + defer n.unsubscribeHealthChecks() + + var pollCh <-chan time.Time + if pollInterval > 0 { + lggr.Debug("Polling enabled") + pollT := time.NewTicker(pollInterval) + defer pollT.Stop() + pollCh = pollT.C + if pollFailureThreshold > 0 { + // polling can be enabled with no threshold to enable polling but + // the node will not be marked offline regardless of the number of + // poll failures + lggr.Debug("Polling liveness checking enabled") + } + } else { + lggr.Debug("Polling disabled") + } + + var finalizedHeadsSub headSubscription[HEAD] + if n.chainCfg.FinalityTagEnabled() { + finalizedHeadsSub, err = n.registerNewSubscription(ctx, lggr.With("subscriptionType", "finalizedHeads"), + n.chainCfg.NoNewFinalizedHeadsThreshold(), n.rpc.SubscribeToFinalizedHeads) + if err != nil { + lggr.Errorw("Failed to subscribe to finalized heads", "err", err) + n.declareUnreachable() + return + } + } + + localHighestChainInfo, _ := n.rpc.GetInterceptedChainInfo() + var pollFailures uint32 + + for { + select { + case <-ctx.Done(): + return + case <-pollCh: + promPoolRPCNodePolls.WithLabelValues(n.chainID.String(), n.name).Inc() + lggr.Tracew("Pinging RPC", "nodeState", n.State(), "pollFailures", pollFailures) + pollCtx, cancel := context.WithTimeout(ctx, pollInterval) + err = n.RPC().Ping(pollCtx) + cancel() + if err != nil { + // prevent overflow + if pollFailures < math.MaxUint32 { + promPoolRPCNodePollsFailed.WithLabelValues(n.chainID.String(), n.name).Inc() + pollFailures++ + } + lggr.Warnw(fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", n.String()), "err", err, "pollFailures", pollFailures, "nodeState", n.getCachedState()) + } else { + lggr.Debugw("Ping successful", "nodeState", n.State()) + promPoolRPCNodePollsSuccess.WithLabelValues(n.chainID.String(), n.name).Inc() + pollFailures = 0 + } + if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold { + lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState()) + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 { + lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState) + continue + } + } + n.declareUnreachable() + return + } + _, latestChainInfo := n.StateAndLatest() + if outOfSync, liveNodes := n.isOutOfSyncWithPool(latestChainInfo); outOfSync { + // note: there must be another live node for us to be out of sync + lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", latestChainInfo.BlockNumber, "totalDifficulty", latestChainInfo.TotalDifficulty, "nodeState", n.getCachedState()) + if liveNodes < 2 { + lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) + continue + } + n.declareOutOfSync(syncStatusNotInSyncWithPool) + return + } + case bh, open := <-headsSub.Heads: + if !open { + lggr.Errorw("Subscription channel unexpectedly closed", "nodeState", n.getCachedState()) + n.declareUnreachable() + return + } + receivedNewHead := n.onNewHead(lggr, &localHighestChainInfo, bh) + if receivedNewHead && noNewHeadsTimeoutThreshold > 0 { + headsSub.ResetTimer(noNewHeadsTimeoutThreshold) + } + case err = <-headsSub.Errors: + lggr.Errorw("Subscription was terminated", "err", err, "nodeState", n.getCachedState()) + n.declareUnreachable() + return + case <-headsSub.NoNewHeads: + // We haven't received a head on the channel for at least the + // threshold amount of time, mark it broken + lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 { + lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) + // We don't necessarily want to wait the full timeout to check again, we should + // check regularly and log noisily in this state + headsSub.ResetTimer(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)) + continue + } + } + n.declareOutOfSync(syncStatusNoNewHead) + return + case latestFinalized, open := <-finalizedHeadsSub.Heads: + if !open { + lggr.Errorw("Finalized heads subscription channel unexpectedly closed") + n.declareUnreachable() + return + } + + receivedNewHead := n.onNewFinalizedHead(lggr, &localHighestChainInfo, latestFinalized) + if receivedNewHead && noNewFinalizedBlocksTimeoutThreshold > 0 { + finalizedHeadsSub.ResetTimer(noNewFinalizedBlocksTimeoutThreshold) + } + case <-finalizedHeadsSub.NoNewHeads: + // We haven't received a finalized head on the channel for at least the + // threshold amount of time, mark it broken + lggr.Errorw(fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s (last finalized head received was %v)", noNewFinalizedBlocksTimeoutThreshold, localHighestChainInfo.FinalizedBlockNumber), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber) + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 { + lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState) + // We don't necessarily want to wait the full timeout to check again, we should + // check regularly and log noisily in this state + finalizedHeadsSub.ResetTimer(zombieNodeCheckInterval(noNewFinalizedBlocksTimeoutThreshold)) + continue + } + } + n.declareOutOfSync(syncStatusNoNewFinalizedHead) + return + case <-finalizedHeadsSub.Errors: + lggr.Errorw("Finalized heads subscription was terminated", "err", err) + n.declareUnreachable() + return + } + } +} + +func (n *node[CHAIN_ID, HEAD, RPC]) unsubscribeHealthChecks() { + n.stateMu.Lock() + for _, sub := range n.healthCheckSubs { + sub.Unsubscribe() + } + n.healthCheckSubs = []Subscription{} + n.stateMu.Unlock() +} + +type headSubscription[HEAD any] struct { + Heads <-chan HEAD + Errors <-chan error + NoNewHeads <-chan time.Time + + noNewHeadsTicker *time.Ticker + sub Subscription + cleanUpTasks []func() +} + +func (sub *headSubscription[HEAD]) ResetTimer(duration time.Duration) { + sub.noNewHeadsTicker.Reset(duration) +} + +func (sub *headSubscription[HEAD]) Unsubscribe() { + for _, doCleanUp := range sub.cleanUpTasks { + doCleanUp() + } +} + +func (n *node[CHAIN_ID, HEAD, PRC]) registerNewSubscription(ctx context.Context, lggr logger.SugaredLogger, + noNewDataThreshold time.Duration, newSub func(ctx context.Context) (<-chan HEAD, Subscription, error)) (headSubscription[HEAD], error) { + result := headSubscription[HEAD]{} + var err error + var sub Subscription + result.Heads, sub, err = newSub(ctx) + if err != nil { + return result, err + } + + result.Errors = sub.Err() + lggr.Debug("Successfully subscribed") + + result.sub = sub + n.stateMu.Lock() + n.healthCheckSubs = append(n.healthCheckSubs, sub) + n.stateMu.Unlock() + + result.cleanUpTasks = append(result.cleanUpTasks, sub.Unsubscribe) + + if noNewDataThreshold > 0 { + lggr.Debugw("Subscription liveness checking enabled") + result.noNewHeadsTicker = time.NewTicker(noNewDataThreshold) + result.NoNewHeads = result.noNewHeadsTicker.C + result.cleanUpTasks = append(result.cleanUpTasks, result.noNewHeadsTicker.Stop) + } else { + lggr.Debug("Subscription liveness checking disabled") + } + + return result, nil +} + +func (n *node[CHAIN_ID, HEAD, RPC]) onNewFinalizedHead(lggr logger.SugaredLogger, chainInfo *ChainInfo, latestFinalized HEAD) bool { + if !latestFinalized.IsValid() { + lggr.Warn("Latest finalized block is not valid") + return false + } + + latestFinalizedBN := latestFinalized.BlockNumber() + lggr.Tracew("Got latest finalized head", "latestFinalized", latestFinalized) + if latestFinalizedBN <= chainInfo.FinalizedBlockNumber { + lggr.Tracew("Ignoring previously seen finalized block number") + return false + } + + promPoolRPCNodeHighestFinalizedBlock.WithLabelValues(n.chainID.String(), n.name).Set(float64(latestFinalizedBN)) + chainInfo.FinalizedBlockNumber = latestFinalizedBN + return true +} + +func (n *node[CHAIN_ID, HEAD, RPC]) onNewHead(lggr logger.SugaredLogger, chainInfo *ChainInfo, head HEAD) bool { + if !head.IsValid() { + lggr.Warn("Latest head is not valid") + return false + } + + promPoolRPCNodeNumSeenBlocks.WithLabelValues(n.chainID.String(), n.name).Inc() + lggr.Tracew("Got head", "head", head) + lggr = lggr.With("latestReceivedBlockNumber", chainInfo.BlockNumber, "blockNumber", head.BlockNumber(), "nodeState", n.getCachedState()) + if head.BlockNumber() <= chainInfo.BlockNumber { + lggr.Tracew("Ignoring previously seen block number") + return false + } + + promPoolRPCNodeHighestSeenBlock.WithLabelValues(n.chainID.String(), n.name).Set(float64(head.BlockNumber())) + chainInfo.BlockNumber = head.BlockNumber() + + if !n.chainCfg.FinalityTagEnabled() { + latestFinalizedBN := max(head.BlockNumber()-int64(n.chainCfg.FinalityDepth()), 0) + if latestFinalizedBN > chainInfo.FinalizedBlockNumber { + promPoolRPCNodeHighestFinalizedBlock.WithLabelValues(n.chainID.String(), n.name).Set(float64(latestFinalizedBN)) + chainInfo.FinalizedBlockNumber = latestFinalizedBN + } + } + + return true +} + +const ( + msgReceivedBlock = "Received block for RPC node, waiting until back in-sync to mark as live again" + msgReceivedFinalizedBlock = "Received new finalized block for RPC node, waiting until back in-sync to mark as live again" + msgInSync = "RPC node back in sync" +) + +// isOutOfSyncWithPool returns outOfSync true if num or td is more than SyncThresold behind the best node. +// Always returns outOfSync false for SyncThreshold 0. +// liveNodes is only included when outOfSync is true. +func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (outOfSync bool, liveNodes int) { + if n.poolInfoProvider == nil { + n.lfcLog.Warn("skipping sync state against the pool - should only occur in tests") + return // skip for tests + } + threshold := n.nodePoolCfg.SyncThreshold() + if threshold == 0 { + return // disabled + } + // Check against best node + ln, ci := n.poolInfoProvider.LatestChainInfo() + mode := n.nodePoolCfg.SelectionMode() + switch mode { + case NodeSelectionModeHighestHead, NodeSelectionModeRoundRobin, NodeSelectionModePriorityLevel: + return localState.BlockNumber < ci.BlockNumber-int64(threshold), ln + case NodeSelectionModeTotalDifficulty: + bigThreshold := big.NewInt(int64(threshold)) + return localState.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0, ln + default: + panic("unrecognized NodeSelectionMode: " + mode) + } +} + +// outOfSyncLoop takes an OutOfSync node and waits until isOutOfSync returns false to go back to live status +func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { + defer n.wg.Done() + ctx, cancel := n.newCtx() + defer cancel() + + { + // sanity check + state := n.getCachedState() + switch state { + case NodeStateOutOfSync: + case NodeStateClosed: + return + default: + panic(fmt.Sprintf("outOfSyncLoop can only run for node in OutOfSync state, got: %s", state)) + } + } + + outOfSyncAt := time.Now() + + // set logger name to OutOfSync or FinalizedBlockOutOfSync + lggr := logger.Sugared(logger.Named(n.lfcLog, n.getCachedState().String())).With("nodeState", n.getCachedState()) + lggr.Debugw("Trying to revive out-of-sync RPC node") + + // Need to redial since out-of-sync nodes are automatically disconnected + state := n.createVerifiedConn(ctx, lggr) + if state != NodeStateAlive { + n.declareState(state) + return + } + + noNewHeadsTimeoutThreshold := n.chainCfg.NodeNoNewHeadsThreshold() + headsSub, err := n.registerNewSubscription(ctx, lggr.With("subscriptionType", "heads"), + noNewHeadsTimeoutThreshold, n.rpc.SubscribeToHeads) + if err != nil { + lggr.Errorw("Failed to subscribe heads on out-of-sync RPC node", "err", err) + n.declareUnreachable() + return + } + + defer n.unsubscribeHealthChecks() + + lggr.Tracew("Successfully subscribed to heads feed on out-of-sync RPC node") + + noNewFinalizedBlocksTimeoutThreshold := n.chainCfg.NoNewFinalizedHeadsThreshold() + var finalizedHeadsSub headSubscription[HEAD] + if n.chainCfg.FinalityTagEnabled() { + finalizedHeadsSub, err = n.registerNewSubscription(ctx, lggr.With("subscriptionType", "finalizedHeads"), + noNewFinalizedBlocksTimeoutThreshold, n.rpc.SubscribeToFinalizedHeads) + if err != nil { + lggr.Errorw("Subscribe to finalized heads failed on out-of-sync RPC node", "err", err) + n.declareUnreachable() + return + } + + lggr.Tracew("Successfully subscribed to finalized heads feed on out-of-sync RPC node") + } + + _, localHighestChainInfo := n.rpc.GetInterceptedChainInfo() + for { + if syncIssues == syncStatusSynced { + // back in-sync! flip back into alive loop + lggr.Infow(fmt.Sprintf("%s: %s. Node was out-of-sync for %s", msgInSync, n.String(), time.Since(outOfSyncAt))) + n.declareInSync() + return + } + + select { + case <-ctx.Done(): + return + case head, open := <-headsSub.Heads: + if !open { + lggr.Errorw("Subscription channel unexpectedly closed", "nodeState", n.getCachedState()) + n.declareUnreachable() + return + } + + if !n.onNewHead(lggr, &localHighestChainInfo, head) { + continue + } + + // received a new head - clear NoNewHead flag + syncIssues &= ^syncStatusNoNewHead + if outOfSync, _ := n.isOutOfSyncWithPool(localHighestChainInfo); !outOfSync { + // we caught up with the pool - clear NotInSyncWithPool flag + syncIssues &= ^syncStatusNotInSyncWithPool + } else { + // we've received new head, but lagging behind the pool, add NotInSyncWithPool flag to prevent false transition to alive + syncIssues |= syncStatusNotInSyncWithPool + } + + if noNewHeadsTimeoutThreshold > 0 { + headsSub.ResetTimer(noNewHeadsTimeoutThreshold) + } + + lggr.Debugw(msgReceivedBlock, "blockNumber", head.BlockNumber(), "blockDifficulty", head.BlockDifficulty(), "syncIssues", syncIssues) + case <-time.After(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)): + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 1 { + lggr.Criticalw("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state", "syncIssues", syncIssues) + n.declareInSync() + return + } + } + case err := <-headsSub.Errors: + lggr.Errorw("Subscription was terminated", "err", err) + n.declareUnreachable() + return + case <-headsSub.NoNewHeads: + // we are not resetting the timer, as there is no need to add syncStatusNoNewHead until it's removed on new head. + syncIssues |= syncStatusNoNewHead + lggr.Debugw(fmt.Sprintf("No new heads received for %s. Node stays out-of-sync due to sync issues: %s", noNewHeadsTimeoutThreshold, syncIssues)) + case latestFinalized, open := <-finalizedHeadsSub.Heads: + if !open { + lggr.Errorw("Finalized heads subscription channel unexpectedly closed") + n.declareUnreachable() + return + } + if !latestFinalized.IsValid() { + lggr.Warn("Latest finalized block is not valid") + continue + } + + receivedNewHead := n.onNewFinalizedHead(lggr, &localHighestChainInfo, latestFinalized) + if !receivedNewHead { + continue + } + + // on new finalized head remove NoNewFinalizedHead flag from the mask + syncIssues &= ^syncStatusNoNewFinalizedHead + if noNewFinalizedBlocksTimeoutThreshold > 0 { + finalizedHeadsSub.ResetTimer(noNewFinalizedBlocksTimeoutThreshold) + } + + lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "syncIssues", syncIssues) + case err := <-finalizedHeadsSub.Errors: + lggr.Errorw("Finalized head subscription was terminated", "err", err) + n.declareUnreachable() + return + case <-finalizedHeadsSub.NoNewHeads: + // we are not resetting the timer, as there is no need to add syncStatusNoNewFinalizedHead until it's removed on new finalized head. + syncIssues |= syncStatusNoNewFinalizedHead + lggr.Debugw(fmt.Sprintf("No new finalized heads received for %s. Node stays out-of-sync due to sync issues: %s", noNewFinalizedBlocksTimeoutThreshold, syncIssues)) + } + } +} + +func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() { + defer n.wg.Done() + ctx, cancel := n.newCtx() + defer cancel() + + { + // sanity check + state := n.getCachedState() + switch state { + case NodeStateUnreachable: + case NodeStateClosed: + return + default: + panic(fmt.Sprintf("unreachableLoop can only run for node in Unreachable state, got: %s", state)) + } + } + + unreachableAt := time.Now() + + lggr := logger.Sugared(logger.Named(n.lfcLog, "Unreachable")) + lggr.Debugw("Trying to revive unreachable RPC node", "nodeState", n.getCachedState()) + + dialRetryBackoff := NewRedialBackoff() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(dialRetryBackoff.Duration()): + lggr.Tracew("Trying to re-dial RPC node", "nodeState", n.getCachedState()) + + err := n.rpc.Dial(ctx) + if err != nil { + lggr.Errorw(fmt.Sprintf("Failed to redial RPC node; still unreachable: %v", err), "err", err, "nodeState", n.getCachedState()) + continue + } + + n.setState(NodeStateDialed) + + state := n.verifyConn(ctx, lggr) + switch state { + case NodeStateUnreachable: + n.setState(NodeStateUnreachable) + continue + case NodeStateAlive: + lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Node was offline for %s", n.String(), time.Since(unreachableAt)), "nodeState", n.getCachedState()) + fallthrough + default: + n.declareState(state) + return + } + } + } +} + +func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() { + defer n.wg.Done() + ctx, cancel := n.newCtx() + defer cancel() + + { + // sanity check + state := n.getCachedState() + switch state { + case NodeStateInvalidChainID: + case NodeStateClosed: + return + default: + panic(fmt.Sprintf("invalidChainIDLoop can only run for node in InvalidChainID state, got: %s", state)) + } + } + + invalidAt := time.Now() + + lggr := logger.Named(n.lfcLog, "InvalidChainID") + + // Need to redial since invalid chain ID nodes are automatically disconnected + state := n.createVerifiedConn(ctx, lggr) + if state != NodeStateInvalidChainID { + n.declareState(state) + return + } + + lggr.Debugw(fmt.Sprintf("Periodically re-checking RPC node %s with invalid chain ID", n.String()), "nodeState", n.getCachedState()) + + chainIDRecheckBackoff := NewRedialBackoff() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(chainIDRecheckBackoff.Duration()): + state := n.verifyConn(ctx, lggr) + switch state { + case NodeStateInvalidChainID: + continue + case NodeStateAlive: + lggr.Infow(fmt.Sprintf("Successfully verified RPC node. Node was offline for %s", time.Since(invalidAt)), "nodeState", n.getCachedState()) + fallthrough + default: + n.declareState(state) + return + } + } + } +} + +func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() { + defer n.wg.Done() + ctx, cancel := n.newCtx() + defer cancel() + + { + // sanity check + state := n.getCachedState() + switch state { + case NodeStateSyncing: + case NodeStateClosed: + return + default: + panic(fmt.Sprintf("syncingLoop can only run for node in NodeStateSyncing state, got: %s", state)) + } + } + + syncingAt := time.Now() + + lggr := logger.Sugared(logger.Named(n.lfcLog, "Syncing")) + lggr.Debugw(fmt.Sprintf("Periodically re-checking RPC node %s with syncing status", n.String()), "nodeState", n.getCachedState()) + // Need to redial since syncing nodes are automatically disconnected + state := n.createVerifiedConn(ctx, lggr) + if state != NodeStateSyncing { + n.declareState(state) + return + } + + recheckBackoff := NewRedialBackoff() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(recheckBackoff.Duration()): + lggr.Tracew("Trying to recheck if the node is still syncing", "nodeState", n.getCachedState()) + isSyncing, err := n.rpc.IsSyncing(ctx) + if err != nil { + lggr.Errorw("Unexpected error while verifying RPC node synchronization status", "err", err, "nodeState", n.getCachedState()) + n.declareUnreachable() + return + } + + if isSyncing { + lggr.Errorw("Verification failed: Node is syncing", "nodeState", n.getCachedState()) + continue + } + + lggr.Infow(fmt.Sprintf("Successfully verified RPC node. Node was syncing for %s", time.Since(syncingAt)), "nodeState", n.getCachedState()) + n.declareAlive() + return + } + } +} diff --git a/pkg/solana/client/multinode/node_selector.go b/pkg/solana/client/multinode/node_selector.go new file mode 100644 index 000000000..872026fe2 --- /dev/null +++ b/pkg/solana/client/multinode/node_selector.go @@ -0,0 +1,41 @@ +package client + +import ( + "fmt" +) + +const ( + NodeSelectionModeHighestHead = "HighestHead" + NodeSelectionModeRoundRobin = "RoundRobin" + NodeSelectionModeTotalDifficulty = "TotalDifficulty" + NodeSelectionModePriorityLevel = "PriorityLevel" +) + +type NodeSelector[ + CHAIN_ID ID, + RPC any, +] interface { + // Select returns a Node, or nil if none can be selected. + // Implementation must be thread-safe. + Select() Node[CHAIN_ID, RPC] + // Name returns the strategy name, e.g. "HighestHead" or "RoundRobin" + Name() string +} + +func newNodeSelector[ + CHAIN_ID ID, + RPC any, +](selectionMode string, nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC] { + switch selectionMode { + case NodeSelectionModeHighestHead: + return NewHighestHeadNodeSelector[CHAIN_ID, RPC](nodes) + case NodeSelectionModeRoundRobin: + return NewRoundRobinSelector[CHAIN_ID, RPC](nodes) + case NodeSelectionModeTotalDifficulty: + return NewTotalDifficultyNodeSelector[CHAIN_ID, RPC](nodes) + case NodeSelectionModePriorityLevel: + return NewPriorityLevelNodeSelector[CHAIN_ID, RPC](nodes) + default: + panic(fmt.Sprintf("unsupported NodeSelectionMode: %s", selectionMode)) + } +} diff --git a/pkg/solana/client/multinode/node_selector_highest_head.go b/pkg/solana/client/multinode/node_selector_highest_head.go new file mode 100644 index 000000000..68901cba3 --- /dev/null +++ b/pkg/solana/client/multinode/node_selector_highest_head.go @@ -0,0 +1,36 @@ +package client + +import "math" + +type highestHeadNodeSelector[ + CHAIN_ID ID, + RPC any, +] []Node[CHAIN_ID, RPC] + +func NewHighestHeadNodeSelector[ + CHAIN_ID ID, + RPC any, +](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC] { + return highestHeadNodeSelector[CHAIN_ID, RPC](nodes) +} + +func (s highestHeadNodeSelector[CHAIN_ID, RPC]) Select() Node[CHAIN_ID, RPC] { + var highestHeadNumber int64 = math.MinInt64 + var highestHeadNodes []Node[CHAIN_ID, RPC] + for _, n := range s { + state, currentChainInfo := n.StateAndLatest() + currentHeadNumber := currentChainInfo.BlockNumber + if state == NodeStateAlive && currentHeadNumber >= highestHeadNumber { + if highestHeadNumber < currentHeadNumber { + highestHeadNumber = currentHeadNumber + highestHeadNodes = nil + } + highestHeadNodes = append(highestHeadNodes, n) + } + } + return firstOrHighestPriority(highestHeadNodes) +} + +func (s highestHeadNodeSelector[CHAIN_ID, RPC]) Name() string { + return NodeSelectionModeHighestHead +} diff --git a/pkg/solana/client/multinode/node_selector_priority_level.go b/pkg/solana/client/multinode/node_selector_priority_level.go new file mode 100644 index 000000000..ead720976 --- /dev/null +++ b/pkg/solana/client/multinode/node_selector_priority_level.go @@ -0,0 +1,121 @@ +package client + +import ( + "math" + "sort" + "sync/atomic" +) + +type priorityLevelNodeSelector[ + CHAIN_ID ID, + RPC any, +] struct { + nodes []Node[CHAIN_ID, RPC] + roundRobinCount []atomic.Uint32 +} + +type nodeWithPriority[ + CHAIN_ID ID, + RPC any, +] struct { + node Node[CHAIN_ID, RPC] + priority int32 +} + +func NewPriorityLevelNodeSelector[ + CHAIN_ID ID, + RPC any, +](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC] { + return &priorityLevelNodeSelector[CHAIN_ID, RPC]{ + nodes: nodes, + roundRobinCount: make([]atomic.Uint32, nrOfPriorityTiers(nodes)), + } +} + +func (s priorityLevelNodeSelector[CHAIN_ID, RPC]) Select() Node[CHAIN_ID, RPC] { + nodes := s.getHighestPriorityAliveTier() + + if len(nodes) == 0 { + return nil + } + priorityLevel := nodes[len(nodes)-1].priority + + // NOTE: Inc returns the number after addition, so we must -1 to get the "current" counter + count := int(s.roundRobinCount[priorityLevel].Add(1) - 1) + idx := count % len(nodes) + + return nodes[idx].node +} + +func (s priorityLevelNodeSelector[CHAIN_ID, RPC]) Name() string { + return NodeSelectionModePriorityLevel +} + +// getHighestPriorityAliveTier filters nodes that are not in state NodeStateAlive and +// returns only the highest tier of alive nodes +func (s priorityLevelNodeSelector[CHAIN_ID, RPC]) getHighestPriorityAliveTier() []nodeWithPriority[CHAIN_ID, RPC] { + var nodes []nodeWithPriority[CHAIN_ID, RPC] + for _, n := range s.nodes { + if n.State() == NodeStateAlive { + nodes = append(nodes, nodeWithPriority[CHAIN_ID, RPC]{n, n.Order()}) + } + } + + if len(nodes) == 0 { + return nil + } + + return removeLowerTiers(nodes) +} + +// removeLowerTiers take a slice of nodeWithPriority[CHAIN_ID, BLOCK_HASH, HEAD, RPC] and keeps only the highest tier +func removeLowerTiers[ + CHAIN_ID ID, + RPC any, +](nodes []nodeWithPriority[CHAIN_ID, RPC]) []nodeWithPriority[CHAIN_ID, RPC] { + sort.SliceStable(nodes, func(i, j int) bool { + return nodes[i].priority > nodes[j].priority + }) + + var nodes2 []nodeWithPriority[CHAIN_ID, RPC] + currentPriority := nodes[len(nodes)-1].priority + + for _, n := range nodes { + if n.priority == currentPriority { + nodes2 = append(nodes2, n) + } + } + + return nodes2 +} + +// nrOfPriorityTiers calculates the total number of priority tiers +func nrOfPriorityTiers[ + CHAIN_ID ID, + RPC any, +](nodes []Node[CHAIN_ID, RPC]) int32 { + highestPriority := int32(0) + for _, n := range nodes { + priority := n.Order() + if highestPriority < priority { + highestPriority = priority + } + } + return highestPriority + 1 +} + +// firstOrHighestPriority takes a list of nodes and returns the first one with the highest priority +func firstOrHighestPriority[ + CHAIN_ID ID, + RPC any, +](nodes []Node[CHAIN_ID, RPC]) Node[CHAIN_ID, RPC] { + hp := int32(math.MaxInt32) + var node Node[CHAIN_ID, RPC] + for _, n := range nodes { + if n.Order() < hp { + hp = n.Order() + node = n + } + } + return node +} diff --git a/pkg/solana/client/multinode/node_selector_round_robin.go b/pkg/solana/client/multinode/node_selector_round_robin.go new file mode 100644 index 000000000..c5ed8d853 --- /dev/null +++ b/pkg/solana/client/multinode/node_selector_round_robin.go @@ -0,0 +1,46 @@ +package client + +import ( + "sync/atomic" +) + +type roundRobinSelector[ + CHAIN_ID ID, + RPC any, +] struct { + nodes []Node[CHAIN_ID, RPC] + roundRobinCount atomic.Uint32 +} + +func NewRoundRobinSelector[ + CHAIN_ID ID, + RPC any, +](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC] { + return &roundRobinSelector[CHAIN_ID, RPC]{ + nodes: nodes, + } +} + +func (s *roundRobinSelector[CHAIN_ID, RPC]) Select() Node[CHAIN_ID, RPC] { + var liveNodes []Node[CHAIN_ID, RPC] + for _, n := range s.nodes { + if n.State() == NodeStateAlive { + liveNodes = append(liveNodes, n) + } + } + + nNodes := len(liveNodes) + if nNodes == 0 { + return nil + } + + // NOTE: Inc returns the number after addition, so we must -1 to get the "current" counter + count := int(s.roundRobinCount.Add(1) - 1) + idx := count % nNodes + + return liveNodes[idx] +} + +func (s *roundRobinSelector[CHAIN_ID, RPC]) Name() string { + return NodeSelectionModeRoundRobin +} diff --git a/pkg/solana/client/multinode/node_selector_total_difficulty.go b/pkg/solana/client/multinode/node_selector_total_difficulty.go new file mode 100644 index 000000000..3f3c79de9 --- /dev/null +++ b/pkg/solana/client/multinode/node_selector_total_difficulty.go @@ -0,0 +1,51 @@ +package client + +import ( + "math/big" +) + +type totalDifficultyNodeSelector[ + CHAIN_ID ID, + RPC any, +] []Node[CHAIN_ID, RPC] + +func NewTotalDifficultyNodeSelector[ + CHAIN_ID ID, + RPC any, +](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC] { + return totalDifficultyNodeSelector[CHAIN_ID, RPC](nodes) +} + +func (s totalDifficultyNodeSelector[CHAIN_ID, RPC]) Select() Node[CHAIN_ID, RPC] { + // NodeNoNewHeadsThreshold may not be enabled, in this case all nodes have td == nil + var highestTD *big.Int + var nodes []Node[CHAIN_ID, RPC] + var aliveNodes []Node[CHAIN_ID, RPC] + + for _, n := range s { + state, currentChainInfo := n.StateAndLatest() + if state != NodeStateAlive { + continue + } + + currentTD := currentChainInfo.TotalDifficulty + aliveNodes = append(aliveNodes, n) + if currentTD != nil && (highestTD == nil || currentTD.Cmp(highestTD) >= 0) { + if highestTD == nil || currentTD.Cmp(highestTD) > 0 { + highestTD = currentTD + nodes = nil + } + nodes = append(nodes, n) + } + } + + //If all nodes have td == nil pick one from the nodes that are alive + if len(nodes) == 0 { + return firstOrHighestPriority(aliveNodes) + } + return firstOrHighestPriority(nodes) +} + +func (s totalDifficultyNodeSelector[CHAIN_ID, RPC]) Name() string { + return NodeSelectionModeTotalDifficulty +} diff --git a/pkg/solana/client/multinode/poller.go b/pkg/solana/client/multinode/poller.go new file mode 100644 index 000000000..9ebe1dcfc --- /dev/null +++ b/pkg/solana/client/multinode/poller.go @@ -0,0 +1,93 @@ +package client + +import ( + "context" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +// Poller is a component that polls a function at a given interval +// and delivers the result to a channel. It is used by multinode to poll +// for new heads and implements the Subscription interface. +type Poller[T any] struct { + services.Service + eng *services.Engine + + pollingInterval time.Duration + pollingFunc func(ctx context.Context) (T, error) + pollingTimeout time.Duration + channel chan<- T + errCh chan error +} + +// NewPoller creates a new Poller instance and returns a channel to receive the polled data +func NewPoller[ + T any, +](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout time.Duration, lggr logger.Logger) (Poller[T], <-chan T) { + channel := make(chan T) + p := Poller[T]{ + pollingInterval: pollingInterval, + pollingFunc: pollingFunc, + pollingTimeout: pollingTimeout, + channel: channel, + errCh: make(chan error), + } + p.Service, p.eng = services.Config{ + Name: "Poller", + Start: p.start, + Close: p.close, + }.NewServiceEngine(lggr) + return p, channel +} + +var _ Subscription = &Poller[any]{} + +func (p *Poller[T]) start(ctx context.Context) error { + p.eng.Go(p.pollingLoop) + return nil +} + +// Unsubscribe cancels the sending of events to the data channel +func (p *Poller[T]) Unsubscribe() { + _ = p.Close() +} + +func (p *Poller[T]) close() error { + close(p.errCh) + close(p.channel) + return nil +} + +func (p *Poller[T]) Err() <-chan error { + return p.errCh +} + +func (p *Poller[T]) pollingLoop(ctx context.Context) { + ticker := time.NewTicker(p.pollingInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Set polling timeout + pollingCtx, cancelPolling := context.WithTimeout(ctx, p.pollingTimeout) + // Execute polling function + result, err := p.pollingFunc(pollingCtx) + cancelPolling() + if err != nil { + p.eng.Warnf("polling error: %v", err) + continue + } + // Send result to channel or block if channel is full + select { + case p.channel <- result: + case <-ctx.Done(): + return + } + } + } +} diff --git a/pkg/solana/client/multinode/redialbackoff.go b/pkg/solana/client/multinode/redialbackoff.go new file mode 100644 index 000000000..41be2232d --- /dev/null +++ b/pkg/solana/client/multinode/redialbackoff.go @@ -0,0 +1,17 @@ +package client + +import ( + "time" + + "github.com/jpillora/backoff" +) + +// NewRedialBackoff is a standard backoff to use for redialling or reconnecting to +// unreachable network endpoints +func NewRedialBackoff() backoff.Backoff { + return backoff.Backoff{ + Min: 1 * time.Second, + Max: 15 * time.Second, + Jitter: true, + } +} diff --git a/pkg/solana/client/multinode/send_only_node.go b/pkg/solana/client/multinode/send_only_node.go new file mode 100644 index 000000000..4f60f566d --- /dev/null +++ b/pkg/solana/client/multinode/send_only_node.go @@ -0,0 +1,181 @@ +package client + +import ( + "context" + "fmt" + "net/url" + "sync" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +type sendOnlyClient[ + CHAIN_ID ID, +] interface { + Close() + ChainID(context.Context) (CHAIN_ID, error) + Dial(ctx context.Context) error +} + +// SendOnlyNode represents one node used as a sendonly +type SendOnlyNode[ + CHAIN_ID ID, + RPC any, +] interface { + // Start may attempt to connect to the node, but should only return error for misconfiguration - never for temporary errors. + Start(context.Context) error + Close() error + + ConfiguredChainID() CHAIN_ID + RPC() RPC + + String() string + // State returns NodeState + State() NodeState + // Name is a unique identifier for this node. + Name() string +} + +// It only supports sending transactions +// It must use an http(s) url +type sendOnlyNode[ + CHAIN_ID ID, + RPC sendOnlyClient[CHAIN_ID], +] struct { + services.StateMachine + + stateMu sync.RWMutex // protects state* fields + state NodeState + + rpc RPC + uri url.URL + log logger.Logger + name string + chainID CHAIN_ID + chStop services.StopChan + wg sync.WaitGroup +} + +// NewSendOnlyNode returns a new sendonly node +func NewSendOnlyNode[ + CHAIN_ID ID, + RPC sendOnlyClient[CHAIN_ID], +]( + lggr logger.Logger, + httpuri url.URL, + name string, + chainID CHAIN_ID, + rpc RPC, +) SendOnlyNode[CHAIN_ID, RPC] { + s := new(sendOnlyNode[CHAIN_ID, RPC]) + s.name = name + s.log = logger.Named(logger.Named(lggr, "SendOnlyNode"), name) + s.log = logger.With(s.log, + "nodeTier", "sendonly", + ) + s.rpc = rpc + s.uri = httpuri + s.chainID = chainID + s.chStop = make(chan struct{}) + return s +} + +func (s *sendOnlyNode[CHAIN_ID, RPC]) Start(ctx context.Context) error { + return s.StartOnce(s.name, func() error { + s.start(ctx) + return nil + }) +} + +// Start setups up and verifies the sendonly node +// Should only be called once in a node's lifecycle +func (s *sendOnlyNode[CHAIN_ID, RPC]) start(startCtx context.Context) { + if s.State() != NodeStateUndialed { + panic(fmt.Sprintf("cannot dial node with state %v", s.state)) + } + + err := s.rpc.Dial(startCtx) + if err != nil { + promPoolRPCNodeTransitionsToUnusable.WithLabelValues(s.chainID.String(), s.name).Inc() + s.log.Errorw("Dial failed: SendOnly Node is unusable", "err", err) + s.setState(NodeStateUnusable) + return + } + s.setState(NodeStateDialed) + + if s.chainID.String() == "0" { + // Skip verification if chainID is zero + s.log.Warn("sendonly rpc ChainID verification skipped") + } else { + chainID, err := s.rpc.ChainID(startCtx) + if err != nil || chainID.String() != s.chainID.String() { + promPoolRPCNodeTransitionsToUnreachable.WithLabelValues(s.chainID.String(), s.name).Inc() + if err != nil { + promPoolRPCNodeTransitionsToUnreachable.WithLabelValues(s.chainID.String(), s.name).Inc() + s.log.Errorw(fmt.Sprintf("Verify failed: %v", err), "err", err) + s.setState(NodeStateUnreachable) + } else { + promPoolRPCNodeTransitionsToInvalidChainID.WithLabelValues(s.chainID.String(), s.name).Inc() + s.log.Errorf( + "sendonly rpc ChainID doesn't match local chain ID: RPC ID=%s, local ID=%s, node name=%s", + chainID.String(), + s.chainID.String(), + s.name, + ) + s.setState(NodeStateInvalidChainID) + } + // Since it has failed, spin up the verifyLoop that will keep + // retrying until success + s.wg.Add(1) + go s.verifyLoop() + return + } + } + + promPoolRPCNodeTransitionsToAlive.WithLabelValues(s.chainID.String(), s.name).Inc() + s.setState(NodeStateAlive) + s.log.Infow("Sendonly RPC Node is online", "nodeState", s.state) +} + +func (s *sendOnlyNode[CHAIN_ID, RPC]) Close() error { + return s.StopOnce(s.name, func() error { + s.rpc.Close() + close(s.chStop) + s.wg.Wait() + s.setState(NodeStateClosed) + return nil + }) +} + +func (s *sendOnlyNode[CHAIN_ID, RPC]) ConfiguredChainID() CHAIN_ID { + return s.chainID +} + +func (s *sendOnlyNode[CHAIN_ID, RPC]) RPC() RPC { + return s.rpc +} + +func (s *sendOnlyNode[CHAIN_ID, RPC]) String() string { + return fmt.Sprintf("(%s)%s:%s", Secondary.String(), s.name, s.uri.Redacted()) +} + +func (s *sendOnlyNode[CHAIN_ID, RPC]) setState(state NodeState) (changed bool) { + s.stateMu.Lock() + defer s.stateMu.Unlock() + if s.state == state { + return false + } + s.state = state + return true +} + +func (s *sendOnlyNode[CHAIN_ID, RPC]) State() NodeState { + s.stateMu.RLock() + defer s.stateMu.RUnlock() + return s.state +} + +func (s *sendOnlyNode[CHAIN_ID, RPC]) Name() string { + return s.name +} diff --git a/pkg/solana/client/multinode/send_only_node_lifecycle.go b/pkg/solana/client/multinode/send_only_node_lifecycle.go new file mode 100644 index 000000000..83642feba --- /dev/null +++ b/pkg/solana/client/multinode/send_only_node_lifecycle.go @@ -0,0 +1,65 @@ +package client + +import ( + "fmt" + "time" +) + +// verifyLoop may only be triggered once, on Start, if initial chain ID check +// fails. +// +// It will continue checking until success and then exit permanently. +func (s *sendOnlyNode[CHAIN_ID, RPC]) verifyLoop() { + defer s.wg.Done() + ctx, cancel := s.chStop.NewCtx() + defer cancel() + + backoff := NewRedialBackoff() + for { + select { + case <-ctx.Done(): + return + case <-time.After(backoff.Duration()): + } + chainID, err := s.rpc.ChainID(ctx) + if err != nil { + ok := s.IfStarted(func() { + if changed := s.setState(NodeStateUnreachable); changed { + promPoolRPCNodeTransitionsToUnreachable.WithLabelValues(s.chainID.String(), s.name).Inc() + } + }) + if !ok { + return + } + s.log.Errorw(fmt.Sprintf("Verify failed: %v", err), "err", err) + continue + } else if chainID.String() != s.chainID.String() { + ok := s.IfStarted(func() { + if changed := s.setState(NodeStateInvalidChainID); changed { + promPoolRPCNodeTransitionsToInvalidChainID.WithLabelValues(s.chainID.String(), s.name).Inc() + } + }) + if !ok { + return + } + s.log.Errorf( + "sendonly rpc ChainID doesn't match local chain ID: RPC ID=%s, local ID=%s, node name=%s", + chainID.String(), + s.chainID.String(), + s.name, + ) + + continue + } + ok := s.IfStarted(func() { + if changed := s.setState(NodeStateAlive); changed { + promPoolRPCNodeTransitionsToAlive.WithLabelValues(s.chainID.String(), s.name).Inc() + } + }) + if !ok { + return + } + s.log.Infow("Sendonly RPC Node is online", "NodeState", s.state) + return + } +} diff --git a/pkg/solana/client/multinode/transaction_sender.go b/pkg/solana/client/multinode/transaction_sender.go new file mode 100644 index 000000000..fbd5acca5 --- /dev/null +++ b/pkg/solana/client/multinode/transaction_sender.go @@ -0,0 +1,276 @@ +package client + +import ( + "context" + "errors" + "fmt" + "math" + "slices" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +var ( + // PromMultiNodeInvariantViolations reports violation of our assumptions + PromMultiNodeInvariantViolations = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "solana_multi_node_invariant_violations", + Help: "The number of invariant violations", + }, []string{"network", "chainId", "invariant"}) +) + +// TxErrorClassifier - defines interface of a function that transforms raw RPC error into the SendTxReturnCode enum +// (e.g. Successful, Fatal, Retryable, etc.) +type TxErrorClassifier[TX any] func(tx TX, err error) SendTxReturnCode + +type sendTxResult struct { + Err error + ResultCode SendTxReturnCode +} + +const sendTxQuorum = 0.7 + +// SendTxRPCClient - defines interface of an RPC used by TransactionSender to broadcast transaction +type SendTxRPCClient[TX any] interface { + // SendTransaction errors returned should include name or other unique identifier of the RPC + SendTransaction(ctx context.Context, tx TX) error +} + +func NewTransactionSender[TX any, CHAIN_ID ID, RPC SendTxRPCClient[TX]]( + lggr logger.Logger, + chainID CHAIN_ID, + chainFamily string, + multiNode *MultiNode[CHAIN_ID, RPC], + txErrorClassifier TxErrorClassifier[TX], + sendTxSoftTimeout time.Duration, +) *TransactionSender[TX, CHAIN_ID, RPC] { + if sendTxSoftTimeout == 0 { + sendTxSoftTimeout = QueryTimeout / 2 + } + return &TransactionSender[TX, CHAIN_ID, RPC]{ + chainID: chainID, + chainFamily: chainFamily, + lggr: logger.Sugared(lggr).Named("TransactionSender").With("chainID", chainID.String()), + multiNode: multiNode, + txErrorClassifier: txErrorClassifier, + sendTxSoftTimeout: sendTxSoftTimeout, + chStop: make(services.StopChan), + } +} + +type TransactionSender[TX any, CHAIN_ID ID, RPC SendTxRPCClient[TX]] struct { + services.StateMachine + chainID CHAIN_ID + chainFamily string + lggr logger.SugaredLogger + multiNode *MultiNode[CHAIN_ID, RPC] + txErrorClassifier TxErrorClassifier[TX] + sendTxSoftTimeout time.Duration // defines max waiting time from first response til responses evaluation + + wg sync.WaitGroup // waits for all reporting goroutines to finish + chStop services.StopChan +} + +// SendTransaction - broadcasts transaction to all the send-only and primary nodes in MultiNode. +// A returned nil or error does not guarantee that the transaction will or won't be included. Additional checks must be +// performed to determine the final state. +// +// Send-only nodes' results are ignored as they tend to return false-positive responses. Broadcast to them is necessary +// to speed up the propagation of TX in the network. +// +// Handling of primary nodes' results consists of collection and aggregation. +// In the collection step, we gather as many results as possible while minimizing waiting time. This operation succeeds +// on one of the following conditions: +// * Received at least one success +// * Received at least one result and `sendTxSoftTimeout` expired +// * Received results from the sufficient number of nodes defined by sendTxQuorum. +// The aggregation is based on the following conditions: +// * If there is at least one success - returns success +// * If there is at least one terminal error - returns terminal error +// * If there is both success and terminal error - returns success and reports invariant violation +// * Otherwise, returns any (effectively random) of the errors. +func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) (SendTxReturnCode, error) { + txResults := make(chan sendTxResult) + txResultsToReport := make(chan sendTxResult) + primaryNodeWg := sync.WaitGroup{} + + ctx, cancel := txSender.chStop.Ctx(ctx) + defer cancel() + + healthyNodesNum := 0 + err := txSender.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPC, isSendOnly bool) { + if isSendOnly { + txSender.wg.Add(1) + go func() { + defer txSender.wg.Done() + // Send-only nodes' results are ignored as they tend to return false-positive responses. + // Broadcast to them is necessary to speed up the propagation of TX in the network. + _ = txSender.broadcastTxAsync(ctx, rpc, tx) + }() + return + } + + // Primary Nodes + healthyNodesNum++ + primaryNodeWg.Add(1) + go func() { + defer primaryNodeWg.Done() + result := txSender.broadcastTxAsync(ctx, rpc, tx) + select { + case <-ctx.Done(): + return + case txResults <- result: + } + + select { + case <-ctx.Done(): + return + case txResultsToReport <- result: + } + }() + }) + + // This needs to be done in parallel so the reporting knows when it's done (when the channel is closed) + txSender.wg.Add(1) + go func() { + defer txSender.wg.Done() + primaryNodeWg.Wait() + close(txResultsToReport) + close(txResults) + }() + + if err != nil { + return Retryable, err + } + + txSender.wg.Add(1) + go txSender.reportSendTxAnomalies(tx, txResultsToReport) + + return txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) +} + +func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) sendTxResult { + txErr := rpc.SendTransaction(ctx, tx) + txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", txErr) + resultCode := txSender.txErrorClassifier(tx, txErr) + if !slices.Contains(sendTxSuccessfulCodes, resultCode) { + txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", txErr) + } + return sendTxResult{Err: txErr, ResultCode: resultCode} +} + +func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) { + defer txSender.wg.Done() + resultsByCode := sendTxResults{} + // txResults eventually will be closed + for txResult := range txResults { + resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err) + } + + _, _, criticalErr := aggregateTxResults(resultsByCode) + if criticalErr != nil { + txSender.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr) + PromMultiNodeInvariantViolations.WithLabelValues(txSender.chainFamily, txSender.chainID.String(), criticalErr.Error()).Inc() + } +} + +type sendTxResults map[SendTxReturnCode][]error + +func aggregateTxResults(resultsByCode sendTxResults) (returnCode SendTxReturnCode, txResult error, err error) { + severeCode, severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) + successCode, successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) + if hasSuccess { + // We assume that primary node would never report false positive txResult for a transaction. + // Thus, if such case occurs it's probably due to misconfiguration or a bug and requires manual intervention. + if hasSevereErrors { + const errMsg = "found contradictions in nodes replies on SendTransaction: got success and severe error" + // return success, since at least 1 node has accepted our broadcasted Tx, and thus it can now be included onchain + return successCode, successResults[0], errors.New(errMsg) + } + + // other errors are temporary - we are safe to return success + return successCode, successResults[0], nil + } + + if hasSevereErrors { + return severeCode, severeErrors[0], nil + } + + // return temporary error + for code, result := range resultsByCode { + return code, result[0], nil + } + + err = fmt.Errorf("expected at least one response on SendTransaction") + return Retryable, err, err +} + +func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan sendTxResult) (SendTxReturnCode, error) { + if healthyNodesNum == 0 { + return Retryable, ErroringNodeError + } + requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum)) + errorsByCode := sendTxResults{} + var softTimeoutChan <-chan time.Time + var resultsCount int +loop: + for { + select { + case <-ctx.Done(): + txSender.lggr.Debugw("Failed to collect of the results before context was done", "tx", tx, "errorsByCode", errorsByCode) + return Retryable, ctx.Err() + case result := <-txResults: + errorsByCode[result.ResultCode] = append(errorsByCode[result.ResultCode], result.Err) + resultsCount++ + if slices.Contains(sendTxSuccessfulCodes, result.ResultCode) || resultsCount >= requiredResults { + break loop + } + case <-softTimeoutChan: + txSender.lggr.Debugw("Send Tx soft timeout expired - returning responses we've collected so far", "tx", tx, "resultsCount", resultsCount, "requiredResults", requiredResults) + break loop + } + + if softTimeoutChan == nil { + tm := time.NewTimer(txSender.sendTxSoftTimeout) + softTimeoutChan = tm.C + // we are fine with stopping timer at the end of function + //nolint + defer tm.Stop() + } + } + + // ignore critical error as it's reported in reportSendTxAnomalies + returnCode, result, _ := aggregateTxResults(errorsByCode) + return returnCode, result +} + +func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Start(ctx context.Context) error { + return txSender.StartOnce("TransactionSender", func() error { + return nil + }) +} + +func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Close() error { + return txSender.StopOnce("TransactionSender", func() error { + close(txSender.chStop) + txSender.wg.Wait() + return nil + }) +} + +// findFirstIn - returns the first existing key and value for the slice of keys +func findFirstIn[K comparable, V any](set map[K]V, keys []K) (K, V, bool) { + for _, k := range keys { + if v, ok := set[k]; ok { + return k, v, true + } + } + var zeroK K + var zeroV V + return zeroK, zeroV, false +} diff --git a/pkg/solana/client/multinode/types.go b/pkg/solana/client/multinode/types.go new file mode 100644 index 000000000..51b70e573 --- /dev/null +++ b/pkg/solana/client/multinode/types.go @@ -0,0 +1,108 @@ +package client + +import ( + "context" + "fmt" + "math/big" +) + +// ID represents the base type, for any chain's ID. +// It should be convertible to a string, that can uniquely identify this chain +type ID fmt.Stringer + +// StringID enables using string directly as a ChainID +type StringID string + +func (s StringID) String() string { + return string(s) +} + +// Subscription represents an event subscription where events are +// delivered on a data channel. +// This is a generic interface for Subscription to represent used by clients. +type Subscription interface { + // Unsubscribe cancels the sending of events to the data channel + // and closes the error channel. Unsubscribe should be callable multiple + // times without causing an error. + Unsubscribe() + // Err returns the subscription error channel. The error channel receives + // a value if there is an issue with the subscription (e.g. the network connection + // delivering the events has been closed). Only one value will ever be sent. + // The error channel is closed by Unsubscribe. + Err() <-chan error +} + +// RPCClient includes all the necessary generalized RPC methods used by Node to perform health checks +type RPCClient[ + CHAIN_ID ID, + HEAD Head, +] interface { + // ChainID - fetches ChainID from the RPC to verify that it matches config + ChainID(ctx context.Context) (CHAIN_ID, error) + // Dial - prepares the RPC for usage. Can be called on fresh or closed RPC + Dial(ctx context.Context) error + // SubscribeToHeads - returns channel and subscription for new heads. + SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) + // SubscribeToFinalizedHeads - returns channel and subscription for finalized heads. + SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) + // Ping - returns error if RPC is not reachable + Ping(context.Context) error + // IsSyncing - returns true if the RPC is in Syncing state and can not process calls + IsSyncing(ctx context.Context) (bool, error) + // UnsubscribeAllExcept - close all subscriptions except `subs` + UnsubscribeAllExcept(subs ...Subscription) + // Close - closes all subscriptions and aborts all RPC calls + Close() + // GetInterceptedChainInfo - returns latest and highest observed by application layer ChainInfo. + // latest ChainInfo is the most recent value received within a NodeClient's current lifecycle between Dial and DisconnectAll. + // highestUserObservations ChainInfo is the highest ChainInfo observed excluding health checks calls. + // Its values must not be reset. + // The results of corresponding calls, to get the most recent head and the latest finalized head, must be + // intercepted and reflected in ChainInfo before being returned to a caller. Otherwise, MultiNode is not able to + // provide repeatable read guarantee. + // DisconnectAll must reset latest ChainInfo to default value. + // Ensure implementation does not have a race condition when values are reset before request completion and as + // a result latest ChainInfo contains information from the previous cycle. + GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) +} + +// Head is the interface required by the NodeClient +type Head interface { + BlockNumber() int64 + BlockDifficulty() *big.Int + IsValid() bool +} + +// PoolChainInfoProvider - provides aggregation of nodes pool ChainInfo +type PoolChainInfoProvider interface { + // LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being + // moved to out-of-sync state. It is better to have one out-of-sync node than no nodes at all. + // Returns highest latest ChainInfo within the alive nodes. E.g. most recent block number and highest block number + // observed by Node A are 10 and 15; Node B - 12 and 14. This method will return 12. + LatestChainInfo() (int, ChainInfo) + // HighestUserObservations - returns highest ChainInfo ever observed by any user of MultiNode. + HighestUserObservations() ChainInfo +} + +// ChainInfo - defines RPC's or MultiNode's view on the chain +type ChainInfo struct { + BlockNumber int64 + FinalizedBlockNumber int64 + TotalDifficulty *big.Int +} + +func MaxTotalDifficulty(a, b *big.Int) *big.Int { + if a == nil { + if b == nil { + return nil + } + + return big.NewInt(0).Set(b) + } + + if b == nil || a.Cmp(b) >= 0 { + return big.NewInt(0).Set(a) + } + + return big.NewInt(0).Set(b) +} diff --git a/pkg/solana/cmd/chainlink-solana/main.go b/pkg/solana/cmd/chainlink-solana/main.go index 08893e7de..d65f6cbc9 100644 --- a/pkg/solana/cmd/chainlink-solana/main.go +++ b/pkg/solana/cmd/chainlink-solana/main.go @@ -66,10 +66,12 @@ func (c *pluginRelayer) NewRelayer(ctx context.Context, config string, keystore Logger: c.Logger, KeyStore: keystore, } + chain, err := solana.NewChain(&cfg.Solana, opts) if err != nil { return nil, fmt.Errorf("failed to create chain: %w", err) } + ra := &loop.RelayerAdapter{Relayer: solana.NewRelayer(c.Logger, chain, capRegistry), RelayerExt: chain} c.SubService(ra) diff --git a/pkg/solana/config/config.go b/pkg/solana/config/config.go index 9d5cdc5a9..28698c7c3 100644 --- a/pkg/solana/config/config.go +++ b/pkg/solana/config/config.go @@ -146,8 +146,9 @@ func (c *Chain) SetDefaults() { } type Node struct { - Name *string - URL *config.URL + Name *string + URL *config.URL + SendOnly bool } func (n *Node) ValidateConfig() (err error) { diff --git a/pkg/solana/config/multinode.go b/pkg/solana/config/multinode.go new file mode 100644 index 000000000..1755e6ee6 --- /dev/null +++ b/pkg/solana/config/multinode.go @@ -0,0 +1,87 @@ +package config + +import "time" + +type MultiNode struct { + // TODO: Determine current config overlap https://smartcontract-it.atlassian.net/browse/BCI-4065 + // Feature flag + multiNodeEnabled bool + + // Node Configs + pollFailureThreshold uint32 + pollInterval time.Duration + selectionMode string + syncThreshold uint32 + nodeIsSyncingEnabled bool + finalizedBlockPollInterval time.Duration + enforceRepeatableRead bool + deathDeclarationDelay time.Duration + + // Chain Configs + nodeNoNewHeadsThreshold time.Duration + noNewFinalizedHeadsThreshold time.Duration + finalityDepth uint32 + finalityTagEnabled bool + finalizedBlockOffset uint32 +} + +func (c *MultiNode) MultiNodeEnabled() bool { + return c.multiNodeEnabled +} + +func (c *MultiNode) PollFailureThreshold() uint32 { + return c.pollFailureThreshold +} + +func (c *MultiNode) PollInterval() time.Duration { + return c.pollInterval +} + +func (c *MultiNode) SelectionMode() string { + return c.selectionMode +} + +func (c *MultiNode) SyncThreshold() uint32 { + return c.syncThreshold +} + +func (c *MultiNode) NodeIsSyncingEnabled() bool { + return c.nodeIsSyncingEnabled +} + +func (c *MultiNode) FinalizedBlockPollInterval() time.Duration { + return c.finalizedBlockPollInterval +} + +func (c *MultiNode) EnforceRepeatableRead() bool { + return c.enforceRepeatableRead +} + +func (c *MultiNode) DeathDeclarationDelay() time.Duration { + return c.deathDeclarationDelay +} + +func (c *MultiNode) NodeNoNewHeadsThreshold() time.Duration { + return c.nodeNoNewHeadsThreshold +} + +func (c *MultiNode) NoNewFinalizedHeadsThreshold() time.Duration { + return c.noNewFinalizedHeadsThreshold +} + +func (c *MultiNode) FinalityDepth() uint32 { + return c.finalityDepth +} + +func (c *MultiNode) FinalityTagEnabled() bool { + return c.finalityTagEnabled +} + +func (c *MultiNode) FinalizedBlockOffset() uint32 { + return c.finalizedBlockOffset +} + +func (c *MultiNode) SetDefaults() { + // TODO: Set defaults for MultiNode config https://smartcontract-it.atlassian.net/browse/BCI-4065 + c.multiNodeEnabled = false +} diff --git a/pkg/solana/config/toml.go b/pkg/solana/config/toml.go index e5eb705e6..90657fd2c 100644 --- a/pkg/solana/config/toml.go +++ b/pkg/solana/config/toml.go @@ -105,6 +105,7 @@ func setFromNode(n, f *Node) { if f.URL != nil { n.URL = f.URL } + n.SendOnly = f.SendOnly } type TOMLConfig struct { @@ -112,6 +113,7 @@ type TOMLConfig struct { // Do not access directly, use [IsEnabled] Enabled *bool Chain + MultiNode Nodes Nodes } @@ -279,8 +281,13 @@ func (c *TOMLConfig) ListNodes() Nodes { return c.Nodes } +func (c *TOMLConfig) MultiNodeConfig() *MultiNode { + return &c.MultiNode +} + func NewDefault() *TOMLConfig { cfg := &TOMLConfig{} - cfg.SetDefaults() + cfg.Chain.SetDefaults() + cfg.MultiNode.SetDefaults() return cfg }