From 0b33b1f9f8384de519a021af80e90c5b0e193c09 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 6 Sep 2024 12:07:39 -0400 Subject: [PATCH] Extend client --- pkg/solana/chain.go | 55 +++--- pkg/solana/client/client.go | 70 +++++++ pkg/solana/client/rpc_client.go | 312 -------------------------------- 3 files changed, 95 insertions(+), 342 deletions(-) delete mode 100644 pkg/solana/client/rpc_client.go diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index 7360f7c0c..9a6068f03 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "math/big" "math/rand" "strings" @@ -70,7 +71,7 @@ func NewChain(cfg *config.TOMLConfig, opts ChainOpts) (Chain, error) { if !cfg.IsEnabled() { return nil, fmt.Errorf("cannot create new chain with ID %s: chain is disabled", *cfg.ChainID) } - c, err := newChain(*cfg.ChainID, cfg, cfg.MultiNodeEnabled(), opts.KeyStore, opts.Logger) + c, err := newChain(*cfg.ChainID, cfg, opts.KeyStore, opts.Logger) if err != nil { return nil, err } @@ -88,9 +89,8 @@ type chain struct { lggr logger.Logger // if multiNode is enabled, the clientCache will not be used - multiNodeEnabled bool - multiNode *mn.MultiNode[mn.StringID, *client.RPCClient] - txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.RPCClient] + multiNode *mn.MultiNode[mn.StringID, *client.Client] + txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.Client] // tracking node chain id for verification clientCache map[string]*verifiedCachedClient // map URL -> {client, chainId} [mainnet/testnet/devnet/localnet] @@ -221,44 +221,43 @@ func (v *verifiedCachedClient) GetAccountInfoWithOpts(ctx context.Context, addr return v.ReaderWriter.GetAccountInfoWithOpts(ctx, addr, opts) } -func newChain(id string, cfg *config.TOMLConfig, multiNodeEnabled bool, ks loop.Keystore, lggr logger.Logger) (*chain, error) { +func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.Logger) (*chain, error) { lggr = logger.With(lggr, "chainID", id, "chain", "solana") var ch = chain{ - id: id, - cfg: cfg, - lggr: logger.Named(lggr, "Chain"), - multiNodeEnabled: multiNodeEnabled, - clientCache: map[string]*verifiedCachedClient{}, + id: id, + cfg: cfg, + lggr: logger.Named(lggr, "Chain"), + clientCache: map[string]*verifiedCachedClient{}, } - if multiNodeEnabled { + if cfg.MultiNodeEnabled() { chainFamily := "solana" mnCfg := cfg.MultiNodeConfig() - var nodes []mn.Node[mn.StringID, *client.RPCClient] + var nodes []mn.Node[mn.StringID, *client.Client] for i, nodeInfo := range cfg.ListNodes() { // create client and check - rpcClient, err := client.NewRPCClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name)) + rpcClient, err := client.NewClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name)) if err != nil { lggr.Warnw("failed to create client", "name", *nodeInfo.Name, "solana-url", nodeInfo.URL.String(), "err", err.Error()) continue } - newNode := mn.NewNode[mn.StringID, *client.Head, *client.RPCClient]( + newNode := mn.NewNode[mn.StringID, *client.Head, *client.Client]( mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name, int32(i), mn.StringID(id), 0, rpcClient, chainFamily) nodes = append(nodes, newNode) } - multiNode := mn.NewMultiNode[mn.StringID, *client.RPCClient]( + multiNode := mn.NewMultiNode[mn.StringID, *client.Client]( lggr, mn.NodeSelectionModeRoundRobin, time.Minute, // TODO: set lease duration nodes, - []mn.SendOnlyNode[mn.StringID, *client.RPCClient]{}, + []mn.SendOnlyNode[mn.StringID, *client.Client]{}, mn.StringID(id), chainFamily, mnCfg.DeathDeclarationDelay(), @@ -269,7 +268,7 @@ func newChain(id string, cfg *config.TOMLConfig, multiNodeEnabled bool, ks loop. return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false) } - txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.RPCClient]( + txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.Client]( lggr, mn.StringID(id), chainFamily, @@ -363,7 +362,7 @@ func (c *chain) ChainID() string { // getClient returns a client, randomly selecting one from available and valid nodes func (c *chain) getClient() (client.ReaderWriter, error) { - if c.multiNodeEnabled { + if c.cfg.MultiNodeEnabled() { return c.multiNode.SelectRPC() } @@ -446,14 +445,12 @@ func (c *chain) Start(ctx context.Context) error { c.lggr.Debug("Starting txm") c.lggr.Debug("Starting balance monitor") var ms services.MultiStart - if c.multiNodeEnabled { + startAll := []services.StartClose{c.txm, c.balanceMonitor} + if c.cfg.MultiNodeEnabled() { c.lggr.Debug("Starting multinode") - err := ms.Start(ctx, c.multiNode, c.txSender) - if err != nil { - return err - } + startAll = append(startAll, c.multiNode, c.txSender) } - return ms.Start(ctx, c.txm, c.balanceMonitor) + return ms.Start(ctx, startAll...) }) } @@ -462,14 +459,12 @@ func (c *chain) Close() error { c.lggr.Debug("Stopping") c.lggr.Debug("Stopping txm") c.lggr.Debug("Stopping balance monitor") - if c.multiNodeEnabled { + closeAll := []io.Closer{c.txm, c.balanceMonitor} + if c.cfg.MultiNodeEnabled() { c.lggr.Debug("Stopping multinode") - err := services.CloseAll(c.multiNode, c.txSender) - if err != nil { - return err - } + closeAll = append(closeAll, c.multiNode, c.txSender) } - return services.CloseAll(c.txm, c.balanceMonitor) + return services.CloseAll(closeAll...) }) } diff --git a/pkg/solana/client/client.go b/pkg/solana/client/client.go index d2294824d..21111fab4 100644 --- a/pkg/solana/client/client.go +++ b/pkg/solana/client/client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/big" "time" mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" @@ -67,6 +68,25 @@ type Client struct { requestGroup *singleflight.Group } +type Head struct { + rpc.GetBlockResult +} + +func (h *Head) BlockNumber() int64 { + if h.BlockHeight == nil { + return 0 + } + return int64(*h.BlockHeight) +} + +func (h *Head) BlockDifficulty() *big.Int { + return nil +} + +func (h *Head) IsValid() bool { + return true +} + func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) { return &Client{ url: endpoint, @@ -81,6 +101,56 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, }, nil } +var _ mn.RPCClient[mn.StringID, *Head] = (*Client)(nil) +var _ mn.SendTxRPCClient[*solana.Transaction] = (*Client)(nil) + +// TODO: BCI-4061: Implement Client for MultiNode + +func (c *Client) Dial(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (c *Client) SubscribeToHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) { + //TODO implement me + panic("implement me") +} + +func (c *Client) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) { + //TODO implement me + panic("implement me") +} + +func (c *Client) Ping(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (c *Client) IsSyncing(ctx context.Context) (bool, error) { + //TODO implement me + panic("implement me") +} + +func (c *Client) UnsubscribeAllExcept(subs ...mn.Subscription) { + //TODO implement me + panic("implement me") +} + +func (c *Client) Close() { + //TODO implement me + panic("implement me") +} + +func (c *Client) GetInterceptedChainInfo() (latest, highestUserObservations mn.ChainInfo) { + //TODO implement me + panic("implement me") +} + +func (c *Client) SendTransaction(ctx context.Context, tx *solana.Transaction) error { + // TODO: Implement + return nil +} + func (c *Client) latency(name string) func() { start := time.Now() return func() { diff --git a/pkg/solana/client/rpc_client.go b/pkg/solana/client/rpc_client.go deleted file mode 100644 index c9ceeab6a..000000000 --- a/pkg/solana/client/rpc_client.go +++ /dev/null @@ -1,312 +0,0 @@ -package client - -import ( - "context" - "errors" - "fmt" - "math/big" - "time" - - "github.com/gagliardetto/solana-go" - "github.com/gagliardetto/solana-go/rpc" - "golang.org/x/sync/singleflight" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" - - mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" - "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" - "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" -) - -var _ ReaderWriter = (*RPCClient)(nil) - -type Head struct { - rpc.GetBlockResult -} - -func (h *Head) BlockNumber() int64 { - if h.BlockHeight == nil { - return 0 - } - return int64(*h.BlockHeight) -} - -func (h *Head) BlockDifficulty() *big.Int { - return nil -} - -func (h *Head) IsValid() bool { - return true -} - -type RPCClient struct { - url string - rpc *rpc.Client - skipPreflight bool // to enable or disable preflight checks - commitment rpc.CommitmentType - maxRetries *uint - txTimeout time.Duration - contextDuration time.Duration - log logger.Logger - - // provides a duplicate function call suppression mechanism - requestGroup *singleflight.Group -} - -// TODO: BCI-4061: Implement RPC Client for MultiNode - -func (c *RPCClient) Dial(ctx context.Context) error { - //TODO implement me - panic("implement me") -} - -func (c *RPCClient) SubscribeToHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) { - //TODO implement me - panic("implement me") -} - -func (c *RPCClient) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) { - //TODO implement me - panic("implement me") -} - -func (c *RPCClient) Ping(ctx context.Context) error { - //TODO implement me - panic("implement me") -} - -func (c *RPCClient) IsSyncing(ctx context.Context) (bool, error) { - //TODO implement me - panic("implement me") -} - -func (c *RPCClient) UnsubscribeAllExcept(subs ...mn.Subscription) { - //TODO implement me - panic("implement me") -} - -func (c *RPCClient) Close() { - //TODO implement me - panic("implement me") -} - -func (c *RPCClient) GetInterceptedChainInfo() (latest, highestUserObservations mn.ChainInfo) { - //TODO implement me - panic("implement me") -} - -func NewRPCClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*RPCClient, error) { - return &RPCClient{ - url: endpoint, - rpc: rpc.New(endpoint), - skipPreflight: cfg.SkipPreflight(), - commitment: cfg.Commitment(), - maxRetries: cfg.MaxRetries(), - txTimeout: cfg.TxTimeout(), - contextDuration: requestTimeout, - log: log, - requestGroup: &singleflight.Group{}, - }, nil -} - -func (c *RPCClient) latency(name string) func() { - start := time.Now() - return func() { - monitor.SetClientLatency(time.Since(start), name, c.url) - } -} - -func (c *RPCClient) Balance(addr solana.PublicKey) (uint64, error) { - done := c.latency("balance") - defer done() - - ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) - defer cancel() - - v, err, _ := c.requestGroup.Do(fmt.Sprintf("GetBalance(%s)", addr.String()), func() (interface{}, error) { - return c.rpc.GetBalance(ctx, addr, c.commitment) - }) - if err != nil { - return 0, err - } - res := v.(*rpc.GetBalanceResult) - return res.Value, err -} - -func (c *RPCClient) SlotHeight() (uint64, error) { - return c.SlotHeightWithCommitment(rpc.CommitmentProcessed) // get the latest slot height -} - -func (c *RPCClient) SlotHeightWithCommitment(commitment rpc.CommitmentType) (uint64, error) { - done := c.latency("slot_height") - defer done() - - ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) - defer cancel() - v, err, _ := c.requestGroup.Do("GetSlotHeight", func() (interface{}, error) { - return c.rpc.GetSlot(ctx, commitment) - }) - return v.(uint64), err -} - -func (c *RPCClient) GetAccountInfoWithOpts(ctx context.Context, addr solana.PublicKey, opts *rpc.GetAccountInfoOpts) (*rpc.GetAccountInfoResult, error) { - done := c.latency("account_info") - defer done() - - ctx, cancel := context.WithTimeout(ctx, c.contextDuration) - defer cancel() - opts.Commitment = c.commitment // overrides passed in value - use defined client commitment type - return c.rpc.GetAccountInfoWithOpts(ctx, addr, opts) -} - -func (c *RPCClient) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) { - done := c.latency("latest_blockhash") - defer done() - - ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) - defer cancel() - - v, err, _ := c.requestGroup.Do("GetLatestBlockhash", func() (interface{}, error) { - return c.rpc.GetLatestBlockhash(ctx, c.commitment) - }) - return v.(*rpc.GetLatestBlockhashResult), err -} - -func (c *RPCClient) ChainID(ctx context.Context) (mn.StringID, error) { - done := c.latency("chain_id") - defer done() - - ctx, cancel := context.WithTimeout(ctx, c.contextDuration) - defer cancel() - v, err, _ := c.requestGroup.Do("GetGenesisHash", func() (interface{}, error) { - return c.rpc.GetGenesisHash(ctx) - }) - if err != nil { - return "", err - } - hash := v.(solana.Hash) - - var network string - switch hash.String() { - case DevnetGenesisHash: - network = "devnet" - case TestnetGenesisHash: - network = "testnet" - case MainnetGenesisHash: - network = "mainnet" - default: - c.log.Warnf("unknown genesis hash - assuming solana chain is 'localnet'") - network = "localnet" - } - return mn.StringID(network), nil -} - -func (c *RPCClient) GetFeeForMessage(msg string) (uint64, error) { - done := c.latency("fee_for_message") - defer done() - - // msg is base58 encoded data - - ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) - defer cancel() - res, err := c.rpc.GetFeeForMessage(ctx, msg, c.commitment) - if err != nil { - return 0, fmt.Errorf("error in GetFeeForMessage: %w", err) - } - - if res == nil || res.Value == nil { - return 0, errors.New("nil pointer in GetFeeForMessage") - } - return *res.Value, nil -} - -// https://docs.solana.com/developing/clients/jsonrpc-api#getsignaturestatuses -func (c *RPCClient) SignatureStatuses(ctx context.Context, sigs []solana.Signature) ([]*rpc.SignatureStatusesResult, error) { - done := c.latency("signature_statuses") - defer done() - - ctx, cancel := context.WithTimeout(ctx, c.contextDuration) - defer cancel() - - // searchTransactionHistory = false - res, err := c.rpc.GetSignatureStatuses(ctx, false, sigs...) - if err != nil { - return nil, fmt.Errorf("error in GetSignatureStatuses: %w", err) - } - - if res == nil || res.Value == nil { - return nil, errors.New("nil pointer in GetSignatureStatuses") - } - return res.Value, nil -} - -// https://docs.solana.com/developing/clients/jsonrpc-api#simulatetransaction -// opts - (optional) use `nil` to use defaults -func (c *RPCClient) SimulateTx(ctx context.Context, tx *solana.Transaction, opts *rpc.SimulateTransactionOpts) (*rpc.SimulateTransactionResult, error) { - done := c.latency("simulate_tx") - defer done() - - ctx, cancel := context.WithTimeout(ctx, c.contextDuration) - defer cancel() - - if opts == nil { - opts = &rpc.SimulateTransactionOpts{ - SigVerify: true, // verify signature - Commitment: c.commitment, - } - } - - res, err := c.rpc.SimulateTransactionWithOpts(ctx, tx, opts) - if err != nil { - return nil, fmt.Errorf("error in SimulateTransactionWithOpts: %w", err) - } - - if res == nil || res.Value == nil { - return nil, errors.New("nil pointer in SimulateTransactionWithOpts") - } - - return res.Value, nil -} - -func (c *RPCClient) SendTransaction(ctx context.Context, tx *solana.Transaction) error { - // TODO: Implement - return nil -} - -func (c *RPCClient) SendTx(ctx context.Context, tx *solana.Transaction) (solana.Signature, error) { - done := c.latency("send_tx") - defer done() - - ctx, cancel := context.WithTimeout(ctx, c.txTimeout) - defer cancel() - - opts := rpc.TransactionOpts{ - SkipPreflight: c.skipPreflight, - PreflightCommitment: c.commitment, - MaxRetries: c.maxRetries, - } - - return c.rpc.SendTransactionWithOpts(ctx, tx, opts) -} - -func (c *RPCClient) GetLatestBlock() (*rpc.GetBlockResult, error) { - // get latest confirmed slot - slot, err := c.SlotHeightWithCommitment(c.commitment) - if err != nil { - return nil, fmt.Errorf("GetLatestBlock.SlotHeight: %w", err) - } - - // get block based on slot - done := c.latency("latest_block") - defer done() - ctx, cancel := context.WithTimeout(context.Background(), c.txTimeout) - defer cancel() - v, err, _ := c.requestGroup.Do("GetBlockWithOpts", func() (interface{}, error) { - version := uint64(0) // pull all tx types (legacy + v0) - return c.rpc.GetBlockWithOpts(ctx, slot, &rpc.GetBlockOpts{ - Commitment: c.commitment, - MaxSupportedTransactionVersion: &version, - }) - }) - return v.(*rpc.GetBlockResult), err -}