Skip to content

Commit

Permalink
Extend client
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Sep 6, 2024
1 parent 2cb4d77 commit 0b33b1f
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 342 deletions.
55 changes: 25 additions & 30 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"math/big"
"math/rand"
"strings"
Expand Down Expand Up @@ -70,7 +71,7 @@ func NewChain(cfg *config.TOMLConfig, opts ChainOpts) (Chain, error) {
if !cfg.IsEnabled() {
return nil, fmt.Errorf("cannot create new chain with ID %s: chain is disabled", *cfg.ChainID)
}
c, err := newChain(*cfg.ChainID, cfg, cfg.MultiNodeEnabled(), opts.KeyStore, opts.Logger)
c, err := newChain(*cfg.ChainID, cfg, opts.KeyStore, opts.Logger)
if err != nil {
return nil, err
}
Expand All @@ -88,9 +89,8 @@ type chain struct {
lggr logger.Logger

// if multiNode is enabled, the clientCache will not be used
multiNodeEnabled bool
multiNode *mn.MultiNode[mn.StringID, *client.RPCClient]
txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.RPCClient]
multiNode *mn.MultiNode[mn.StringID, *client.Client]
txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.Client]

// tracking node chain id for verification
clientCache map[string]*verifiedCachedClient // map URL -> {client, chainId} [mainnet/testnet/devnet/localnet]
Expand Down Expand Up @@ -221,44 +221,43 @@ func (v *verifiedCachedClient) GetAccountInfoWithOpts(ctx context.Context, addr
return v.ReaderWriter.GetAccountInfoWithOpts(ctx, addr, opts)
}

func newChain(id string, cfg *config.TOMLConfig, multiNodeEnabled bool, ks loop.Keystore, lggr logger.Logger) (*chain, error) {
func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.Logger) (*chain, error) {
lggr = logger.With(lggr, "chainID", id, "chain", "solana")
var ch = chain{
id: id,
cfg: cfg,
lggr: logger.Named(lggr, "Chain"),
multiNodeEnabled: multiNodeEnabled,
clientCache: map[string]*verifiedCachedClient{},
id: id,
cfg: cfg,
lggr: logger.Named(lggr, "Chain"),
clientCache: map[string]*verifiedCachedClient{},
}

if multiNodeEnabled {
if cfg.MultiNodeEnabled() {
chainFamily := "solana"

mnCfg := cfg.MultiNodeConfig()

var nodes []mn.Node[mn.StringID, *client.RPCClient]
var nodes []mn.Node[mn.StringID, *client.Client]

for i, nodeInfo := range cfg.ListNodes() {
// create client and check
rpcClient, err := client.NewRPCClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name))
rpcClient, err := client.NewClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name))
if err != nil {
lggr.Warnw("failed to create client", "name", *nodeInfo.Name, "solana-url", nodeInfo.URL.String(), "err", err.Error())
continue
}

newNode := mn.NewNode[mn.StringID, *client.Head, *client.RPCClient](
newNode := mn.NewNode[mn.StringID, *client.Head, *client.Client](
mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name,
int32(i), mn.StringID(id), 0, rpcClient, chainFamily)

nodes = append(nodes, newNode)
}

multiNode := mn.NewMultiNode[mn.StringID, *client.RPCClient](
multiNode := mn.NewMultiNode[mn.StringID, *client.Client](
lggr,
mn.NodeSelectionModeRoundRobin,
time.Minute, // TODO: set lease duration
nodes,
[]mn.SendOnlyNode[mn.StringID, *client.RPCClient]{},
[]mn.SendOnlyNode[mn.StringID, *client.Client]{},
mn.StringID(id),
chainFamily,
mnCfg.DeathDeclarationDelay(),
Expand All @@ -269,7 +268,7 @@ func newChain(id string, cfg *config.TOMLConfig, multiNodeEnabled bool, ks loop.
return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false)
}

txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.RPCClient](
txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.Client](
lggr,
mn.StringID(id),
chainFamily,
Expand Down Expand Up @@ -363,7 +362,7 @@ func (c *chain) ChainID() string {

// getClient returns a client, randomly selecting one from available and valid nodes
func (c *chain) getClient() (client.ReaderWriter, error) {
if c.multiNodeEnabled {
if c.cfg.MultiNodeEnabled() {
return c.multiNode.SelectRPC()
}

Expand Down Expand Up @@ -446,14 +445,12 @@ func (c *chain) Start(ctx context.Context) error {
c.lggr.Debug("Starting txm")
c.lggr.Debug("Starting balance monitor")
var ms services.MultiStart
if c.multiNodeEnabled {
startAll := []services.StartClose{c.txm, c.balanceMonitor}
if c.cfg.MultiNodeEnabled() {
c.lggr.Debug("Starting multinode")
err := ms.Start(ctx, c.multiNode, c.txSender)
if err != nil {
return err
}
startAll = append(startAll, c.multiNode, c.txSender)
}
return ms.Start(ctx, c.txm, c.balanceMonitor)
return ms.Start(ctx, startAll...)
})
}

Expand All @@ -462,14 +459,12 @@ func (c *chain) Close() error {
c.lggr.Debug("Stopping")
c.lggr.Debug("Stopping txm")
c.lggr.Debug("Stopping balance monitor")
if c.multiNodeEnabled {
closeAll := []io.Closer{c.txm, c.balanceMonitor}
if c.cfg.MultiNodeEnabled() {
c.lggr.Debug("Stopping multinode")
err := services.CloseAll(c.multiNode, c.txSender)
if err != nil {
return err
}
closeAll = append(closeAll, c.multiNode, c.txSender)
}
return services.CloseAll(c.txm, c.balanceMonitor)
return services.CloseAll(closeAll...)
})
}

Expand Down
70 changes: 70 additions & 0 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/big"
"time"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
Expand Down Expand Up @@ -67,6 +68,25 @@ type Client struct {
requestGroup *singleflight.Group
}

type Head struct {
rpc.GetBlockResult
}

func (h *Head) BlockNumber() int64 {
if h.BlockHeight == nil {
return 0
}
return int64(*h.BlockHeight)
}

func (h *Head) BlockDifficulty() *big.Int {
return nil
}

func (h *Head) IsValid() bool {
return true
}

func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) {
return &Client{
url: endpoint,
Expand All @@ -81,6 +101,56 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration,
}, nil
}

var _ mn.RPCClient[mn.StringID, *Head] = (*Client)(nil)
var _ mn.SendTxRPCClient[*solana.Transaction] = (*Client)(nil)

// TODO: BCI-4061: Implement Client for MultiNode

func (c *Client) Dial(ctx context.Context) error {
//TODO implement me
panic("implement me")
}

func (c *Client) SubscribeToHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) Ping(ctx context.Context) error {
//TODO implement me
panic("implement me")
}

func (c *Client) IsSyncing(ctx context.Context) (bool, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) UnsubscribeAllExcept(subs ...mn.Subscription) {
//TODO implement me
panic("implement me")
}

func (c *Client) Close() {
//TODO implement me
panic("implement me")
}

func (c *Client) GetInterceptedChainInfo() (latest, highestUserObservations mn.ChainInfo) {
//TODO implement me
panic("implement me")
}

func (c *Client) SendTransaction(ctx context.Context, tx *solana.Transaction) error {
// TODO: Implement
return nil
}

func (c *Client) latency(name string) func() {
start := time.Now()
return func() {
Expand Down
Loading

0 comments on commit 0b33b1f

Please sign in to comment.