From 2cb4d77bf71db88ec70b1605ed6b5adacef3c763 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 5 Sep 2024 12:19:03 -0400 Subject: [PATCH] Add chain multinode flag --- pkg/solana/chain.go | 91 +++++++- pkg/solana/chain_multinode.go | 266 ------------------------ pkg/solana/cmd/chainlink-solana/main.go | 10 +- 3 files changed, 86 insertions(+), 281 deletions(-) delete mode 100644 pkg/solana/chain_multinode.go diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index bc2dd845a..7360f7c0c 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -70,7 +70,7 @@ func NewChain(cfg *config.TOMLConfig, opts ChainOpts) (Chain, error) { if !cfg.IsEnabled() { return nil, fmt.Errorf("cannot create new chain with ID %s: chain is disabled", *cfg.ChainID) } - c, err := newChain(*cfg.ChainID, cfg, opts.KeyStore, opts.Logger) + c, err := newChain(*cfg.ChainID, cfg, cfg.MultiNodeEnabled(), opts.KeyStore, opts.Logger) if err != nil { return nil, err } @@ -87,6 +87,11 @@ type chain struct { balanceMonitor services.Service lggr logger.Logger + // if multiNode is enabled, the clientCache will not be used + multiNodeEnabled bool + multiNode *mn.MultiNode[mn.StringID, *client.RPCClient] + txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.RPCClient] + // tracking node chain id for verification clientCache map[string]*verifiedCachedClient // map URL -> {client, chainId} [mainnet/testnet/devnet/localnet] clientLock sync.RWMutex @@ -216,14 +221,70 @@ func (v *verifiedCachedClient) GetAccountInfoWithOpts(ctx context.Context, addr return v.ReaderWriter.GetAccountInfoWithOpts(ctx, addr, opts) } -func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.Logger) (*chain, error) { +func newChain(id string, cfg *config.TOMLConfig, multiNodeEnabled bool, ks loop.Keystore, lggr logger.Logger) (*chain, error) { lggr = logger.With(lggr, "chainID", id, "chain", "solana") var ch = chain{ - id: id, - cfg: cfg, - lggr: logger.Named(lggr, "Chain"), - clientCache: map[string]*verifiedCachedClient{}, + id: id, + cfg: cfg, + lggr: logger.Named(lggr, "Chain"), + multiNodeEnabled: multiNodeEnabled, + clientCache: map[string]*verifiedCachedClient{}, } + + if multiNodeEnabled { + chainFamily := "solana" + + mnCfg := cfg.MultiNodeConfig() + + var nodes []mn.Node[mn.StringID, *client.RPCClient] + + for i, nodeInfo := range cfg.ListNodes() { + // create client and check + rpcClient, err := client.NewRPCClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name)) + if err != nil { + lggr.Warnw("failed to create client", "name", *nodeInfo.Name, "solana-url", nodeInfo.URL.String(), "err", err.Error()) + continue + } + + newNode := mn.NewNode[mn.StringID, *client.Head, *client.RPCClient]( + mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name, + int32(i), mn.StringID(id), 0, rpcClient, chainFamily) + + nodes = append(nodes, newNode) + } + + multiNode := mn.NewMultiNode[mn.StringID, *client.RPCClient]( + lggr, + mn.NodeSelectionModeRoundRobin, + time.Minute, // TODO: set lease duration + nodes, + []mn.SendOnlyNode[mn.StringID, *client.RPCClient]{}, + mn.StringID(id), + chainFamily, + mnCfg.DeathDeclarationDelay(), + ) + + // TODO: implement error classification + classifySendError := func(tx *solanago.Transaction, err error) mn.SendTxReturnCode { + return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false) + } + + txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.RPCClient]( + lggr, + mn.StringID(id), + chainFamily, + multiNode, + classifySendError, + 0, // use the default value provided by the implementation + ) + + ch.multiNode = multiNode + ch.txSender = txSender + + // clientCache will not be used if multinode is enabled + ch.clientCache = nil + } + tc := func() (client.ReaderWriter, error) { return ch.getClient() } @@ -302,6 +363,10 @@ func (c *chain) ChainID() string { // getClient returns a client, randomly selecting one from available and valid nodes func (c *chain) getClient() (client.ReaderWriter, error) { + if c.multiNodeEnabled { + return c.multiNode.SelectRPC() + } + var node *config.Node var client client.ReaderWriter nodes := c.cfg.ListNodes() @@ -381,6 +446,13 @@ func (c *chain) Start(ctx context.Context) error { c.lggr.Debug("Starting txm") c.lggr.Debug("Starting balance monitor") var ms services.MultiStart + if c.multiNodeEnabled { + c.lggr.Debug("Starting multinode") + err := ms.Start(ctx, c.multiNode, c.txSender) + if err != nil { + return err + } + } return ms.Start(ctx, c.txm, c.balanceMonitor) }) } @@ -390,6 +462,13 @@ func (c *chain) Close() error { c.lggr.Debug("Stopping") c.lggr.Debug("Stopping txm") c.lggr.Debug("Stopping balance monitor") + if c.multiNodeEnabled { + c.lggr.Debug("Stopping multinode") + err := services.CloseAll(c.multiNode, c.txSender) + if err != nil { + return err + } + } return services.CloseAll(c.txm, c.balanceMonitor) }) } diff --git a/pkg/solana/chain_multinode.go b/pkg/solana/chain_multinode.go deleted file mode 100644 index d8e4f133c..000000000 --- a/pkg/solana/chain_multinode.go +++ /dev/null @@ -1,266 +0,0 @@ -package solana - -import ( - "context" - "errors" - "fmt" - "math/big" - "time" - - solanago "github.com/gagliardetto/solana-go" - "github.com/gagliardetto/solana-go/programs/system" - "github.com/smartcontractkit/chainlink-common/pkg/chains" - "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/loop" - "github.com/smartcontractkit/chainlink-common/pkg/services" - relaytypes "github.com/smartcontractkit/chainlink-common/pkg/types" - - "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" - mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" - "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" - "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" - "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm" -) - -func NewMultiNodeChain(cfg *config.TOMLConfig, opts ChainOpts) (Chain, error) { - if !cfg.IsEnabled() { - return nil, fmt.Errorf("cannot create new chain with ID %s: chain is disabled", *cfg.ChainID) - } - c, err := newMultiNodeChain(*cfg.ChainID, cfg, opts.KeyStore, opts.Logger) - if err != nil { - return nil, err - } - return c, nil -} - -var _ Chain = (*multiNodeChain)(nil) - -type multiNodeChain struct { - services.StateMachine - id string - cfg *config.TOMLConfig - multiNode *mn.MultiNode[mn.StringID, *client.RPCClient] - txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.RPCClient] - txm *txm.Txm - balanceMonitor services.Service - lggr logger.Logger -} - -func newMultiNodeChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.Logger) (*multiNodeChain, error) { - lggr = logger.With(lggr, "chainID", id, "chain", "solana") - - chainFamily := "solana" - - cfg.BlockHistoryPollPeriod() - - mnCfg := cfg.MultiNodeConfig() - - var nodes []mn.Node[mn.StringID, *client.RPCClient] - - for i, nodeInfo := range cfg.ListNodes() { - // create client and check - rpcClient, err := client.NewRPCClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name)) - if err != nil { - lggr.Warnw("failed to create client", "name", *nodeInfo.Name, "solana-url", nodeInfo.URL.String(), "err", err.Error()) - continue - } - - newNode := mn.NewNode[mn.StringID, *client.Head, *client.RPCClient]( - mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name, - int32(i), mn.StringID(id), 0, rpcClient, chainFamily) - - nodes = append(nodes, newNode) - } - - multiNode := mn.NewMultiNode[mn.StringID, *client.RPCClient]( - lggr, - mn.NodeSelectionModeRoundRobin, - time.Duration(0), // TODO: set lease duration - nodes, - []mn.SendOnlyNode[mn.StringID, *client.RPCClient]{}, // TODO: no send only nodes? - mn.StringID(id), - chainFamily, - time.Duration(0), // TODO: set deathDeclarationDelay - ) - - classifySendError := func(tx *solanago.Transaction, err error) mn.SendTxReturnCode { - return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false) - } - - txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.RPCClient]( - lggr, - mn.StringID(id), - chainFamily, - multiNode, - classifySendError, - 0, // use the default value provided by the implementation - ) - - var ch = multiNodeChain{ - id: id, - cfg: cfg, - multiNode: multiNode, - txSender: txSender, - lggr: logger.Named(lggr, "Chain"), - } - - tc := func() (client.ReaderWriter, error) { - return ch.multiNode.SelectRPC() - } - - ch.txm = txm.NewTxm(ch.id, tc, cfg, ks, lggr) - bc := func() (monitor.BalanceClient, error) { - return ch.multiNode.SelectRPC() - } - ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc) - return &ch, nil -} - -// ChainService interface -func (c *multiNodeChain) GetChainStatus(ctx context.Context) (relaytypes.ChainStatus, error) { - toml, err := c.cfg.TOMLString() - if err != nil { - return relaytypes.ChainStatus{}, err - } - return relaytypes.ChainStatus{ - ID: c.id, - Enabled: c.cfg.IsEnabled(), - Config: toml, - }, nil -} - -func (c *multiNodeChain) ListNodeStatuses(ctx context.Context, pageSize int32, pageToken string) (stats []relaytypes.NodeStatus, nextPageToken string, total int, err error) { - return chains.ListNodeStatuses(int(pageSize), pageToken, c.listNodeStatuses) -} - -func (c *multiNodeChain) Transact(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error { - return c.sendTx(ctx, from, to, amount, balanceCheck) -} - -func (c *multiNodeChain) listNodeStatuses(start, end int) ([]relaytypes.NodeStatus, int, error) { - stats := make([]relaytypes.NodeStatus, 0) - total := len(c.cfg.Nodes) - if start >= total { - return stats, total, chains.ErrOutOfRange - } - if end > total { - end = total - } - nodes := c.cfg.Nodes[start:end] - for _, node := range nodes { - stat, err := config.NodeStatus(node, c.ChainID()) - if err != nil { - return stats, total, err - } - stats = append(stats, stat) - } - return stats, total, nil -} - -func (c *multiNodeChain) Name() string { - return c.lggr.Name() -} - -func (c *multiNodeChain) ID() string { - return c.id -} - -func (c *multiNodeChain) Config() config.Config { - return c.cfg -} - -func (c *multiNodeChain) TxManager() TxManager { - return c.txm -} - -func (c *multiNodeChain) Reader() (client.Reader, error) { - return c.multiNode.SelectRPC() -} - -func (c *multiNodeChain) ChainID() string { - return c.id -} - -func (c *multiNodeChain) Start(ctx context.Context) error { - return c.StartOnce("Chain", func() error { - c.lggr.Debug("Starting") - c.lggr.Debug("Starting txm") - c.lggr.Debug("Starting balance monitor") - var ms services.MultiStart - return ms.Start(ctx, c.txm, c.balanceMonitor) - }) -} - -func (c *multiNodeChain) Close() error { - return c.StopOnce("Chain", func() error { - c.lggr.Debug("Stopping") - c.lggr.Debug("Stopping txm") - c.lggr.Debug("Stopping balance monitor") - return services.CloseAll(c.txm, c.balanceMonitor) - }) -} - -func (c *multiNodeChain) Ready() error { - return errors.Join( - c.StateMachine.Ready(), - c.txm.Ready(), - ) -} - -func (c *multiNodeChain) HealthReport() map[string]error { - report := map[string]error{c.Name(): c.Healthy()} - services.CopyHealth(report, c.txm.HealthReport()) - return report -} - -func (c *multiNodeChain) sendTx(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error { - reader, err := c.Reader() - if err != nil { - return fmt.Errorf("chain unreachable: %w", err) - } - - fromKey, err := solanago.PublicKeyFromBase58(from) - if err != nil { - return fmt.Errorf("failed to parse from key: %w", err) - } - toKey, err := solanago.PublicKeyFromBase58(to) - if err != nil { - return fmt.Errorf("failed to parse to key: %w", err) - } - if !amount.IsUint64() { - return fmt.Errorf("amount %s overflows uint64", amount) - } - amountI := amount.Uint64() - - blockhash, err := reader.LatestBlockhash() - if err != nil { - return fmt.Errorf("failed to get latest block hash: %w", err) - } - tx, err := solanago.NewTransaction( - []solanago.Instruction{ - system.NewTransferInstruction( - amountI, - fromKey, - toKey, - ).Build(), - }, - blockhash.Value.Blockhash, - solanago.TransactionPayer(fromKey), - ) - if err != nil { - return fmt.Errorf("failed to create tx: %w", err) - } - - if balanceCheck { - if err = solanaValidateBalance(reader, fromKey, amountI, tx.Message.ToBase64()); err != nil { - return fmt.Errorf("failed to validate balance: %w", err) - } - } - - txm := c.TxManager() - err = txm.Enqueue("", tx) - if err != nil { - return fmt.Errorf("transaction failed: %w", err) - } - return nil -} diff --git a/pkg/solana/cmd/chainlink-solana/main.go b/pkg/solana/cmd/chainlink-solana/main.go index 6a966a693..d65f6cbc9 100644 --- a/pkg/solana/cmd/chainlink-solana/main.go +++ b/pkg/solana/cmd/chainlink-solana/main.go @@ -67,15 +67,7 @@ func (c *pluginRelayer) NewRelayer(ctx context.Context, config string, keystore KeyStore: keystore, } - var chain solana.Chain - var err error - - if cfg.Solana.MultiNodeConfig().MultiNodeEnabled() { - chain, err = solana.NewMultiNodeChain(&cfg.Solana, opts) - } else { - chain, err = solana.NewChain(&cfg.Solana, opts) - } - + chain, err := solana.NewChain(&cfg.Solana, opts) if err != nil { return nil, fmt.Errorf("failed to create chain: %w", err) }