Skip to content

Commit

Permalink
MultiNode integration setup
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Aug 20, 2024
1 parent 99ecc4a commit 60673aa
Show file tree
Hide file tree
Showing 22 changed files with 4,103 additions and 11 deletions.
7 changes: 4 additions & 3 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (v *verifiedCachedClient) verifyChainID() (bool, error) {
v.chainIDVerifiedLock.Lock()
defer v.chainIDVerifiedLock.Unlock()

v.chainID, err = v.ReaderWriter.ChainID()
strID, err := v.ReaderWriter.ChainID(context.Background())
v.chainID = strID.String()
if err != nil {
v.chainIDVerified = false
return v.chainIDVerified, fmt.Errorf("failed to fetch ChainID in verifiedCachedClient: %w", err)
Expand Down Expand Up @@ -186,13 +187,13 @@ func (v *verifiedCachedClient) LatestBlockhash() (*rpc.GetLatestBlockhashResult,
return v.ReaderWriter.LatestBlockhash()
}

func (v *verifiedCachedClient) ChainID() (string, error) {
func (v *verifiedCachedClient) ChainID(ctx context.Context) (client.StringID, error) {
verified, err := v.verifyChainID()
if !verified {
return "", err
}

return v.chainID, nil
return client.StringID(v.chainID), nil
}

func (v *verifiedCachedClient) GetFeeForMessage(msg string) (uint64, error) {
Expand Down
268 changes: 268 additions & 0 deletions pkg/solana/chain_multinode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
package solana

import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"

solanago "github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/programs/system"
"github.com/smartcontractkit/chainlink-common/pkg/chains"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
relaytypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm"
)

func NewMultiNodeChain(cfg *config.TOMLConfig, opts ChainOpts) (Chain, error) {
if !cfg.IsEnabled() {
return nil, fmt.Errorf("cannot create new chain with ID %s: chain is disabled", *cfg.ChainID)
}
c, err := newMultiNodeChain(*cfg.ChainID, cfg, opts.KeyStore, opts.Logger)
if err != nil {
return nil, err
}
return c, nil
}

var _ Chain = (*multiNodeChain)(nil)

type multiNodeChain struct {
services.StateMachine
id string
cfg *config.TOMLConfig
multiNode *mn.MultiNode[client.StringID, *client.RpcClient]
txSender *mn.TransactionSender[*solanago.Transaction, client.StringID, *client.RpcClient]
txm *txm.Txm
balanceMonitor services.Service
lggr logger.Logger

clientLock sync.RWMutex
}

func newMultiNodeChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.Logger) (*multiNodeChain, error) {
lggr = logger.With(lggr, "chainID", id, "chain", "solana")

chainFamily := "solana"

cfg.BlockHistoryPollPeriod()

mnCfg := cfg.MultiNodeConfig()

var nodes []mn.Node[client.StringID, *client.RpcClient]

for i, nodeInfo := range cfg.ListNodes() {
// create client and check
rpcClient, err := client.NewRpcClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name))
if err != nil {
lggr.Warnw("failed to create client", "name", *nodeInfo.Name, "solana-url", nodeInfo.URL.String(), "err", err.Error())
continue
}

newNode := mn.NewNode[client.StringID, *client.Head, *client.RpcClient](
mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name,
int32(i), client.StringID(id), 0, rpcClient, chainFamily)

nodes = append(nodes, newNode)
}

multiNode := mn.NewMultiNode[client.StringID, *client.RpcClient](
lggr,
mn.NodeSelectionModeRoundRobin,
time.Duration(0), // TODO: set lease duration
nodes,
[]mn.SendOnlyNode[client.StringID, *client.RpcClient]{}, // TODO: no send only nodes?
client.StringID(id),
chainFamily,
time.Duration(0), // TODO: set deathDeclarationDelay
)

classifySendError := func(tx *solanago.Transaction, err error) mn.SendTxReturnCode {
return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false)
}

txSender := mn.NewTransactionSender[*solanago.Transaction, client.StringID, *client.RpcClient](
lggr,
client.StringID(id),
chainFamily,
multiNode,
classifySendError,
0, // use the default value provided by the implementation
)

var ch = multiNodeChain{
id: id,
cfg: cfg,
multiNode: multiNode,
txSender: txSender,
lggr: logger.Named(lggr, "Chain"),
}

tc := func() (client.ReaderWriter, error) {
return ch.multiNode.SelectRPC()
}

ch.txm = txm.NewTxm(ch.id, tc, cfg, ks, lggr)
bc := func() (monitor.BalanceClient, error) {
return ch.multiNode.SelectRPC()
}
ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc)
return &ch, nil
}

// ChainService interface
func (c *multiNodeChain) GetChainStatus(ctx context.Context) (relaytypes.ChainStatus, error) {
toml, err := c.cfg.TOMLString()
if err != nil {
return relaytypes.ChainStatus{}, err
}
return relaytypes.ChainStatus{
ID: c.id,
Enabled: c.cfg.IsEnabled(),
Config: toml,
}, nil
}

func (c *multiNodeChain) ListNodeStatuses(ctx context.Context, pageSize int32, pageToken string) (stats []relaytypes.NodeStatus, nextPageToken string, total int, err error) {
return chains.ListNodeStatuses(int(pageSize), pageToken, c.listNodeStatuses)
}

func (c *multiNodeChain) Transact(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error {
return c.sendTx(ctx, from, to, amount, balanceCheck)
}

func (c *multiNodeChain) listNodeStatuses(start, end int) ([]relaytypes.NodeStatus, int, error) {
stats := make([]relaytypes.NodeStatus, 0)
total := len(c.cfg.Nodes)
if start >= total {
return stats, total, chains.ErrOutOfRange
}
if end > total {
end = total
}
nodes := c.cfg.Nodes[start:end]
for _, node := range nodes {
stat, err := config.NodeStatus(node, c.ChainID())
if err != nil {
return stats, total, err
}
stats = append(stats, stat)
}
return stats, total, nil
}

func (c *multiNodeChain) Name() string {
return c.lggr.Name()
}

func (c *multiNodeChain) ID() string {
return c.id
}

func (c *multiNodeChain) Config() config.Config {
return c.cfg
}

func (c *multiNodeChain) TxManager() TxManager {
return c.txm
}

func (c *multiNodeChain) Reader() (client.Reader, error) {
return c.multiNode.SelectRPC()
}

func (c *multiNodeChain) ChainID() string {
return c.id
}

func (c *multiNodeChain) Start(ctx context.Context) error {
return c.StartOnce("Chain", func() error {
c.lggr.Debug("Starting")
c.lggr.Debug("Starting txm")
c.lggr.Debug("Starting balance monitor")
var ms services.MultiStart
return ms.Start(ctx, c.txm, c.balanceMonitor)
})
}

func (c *multiNodeChain) Close() error {
return c.StopOnce("Chain", func() error {
c.lggr.Debug("Stopping")
c.lggr.Debug("Stopping txm")
c.lggr.Debug("Stopping balance monitor")
return services.CloseAll(c.txm, c.balanceMonitor)
})
}

func (c *multiNodeChain) Ready() error {
return errors.Join(
c.StateMachine.Ready(),
c.txm.Ready(),
)
}

func (c *multiNodeChain) HealthReport() map[string]error {
report := map[string]error{c.Name(): c.Healthy()}
services.CopyHealth(report, c.txm.HealthReport())
return report
}

func (c *multiNodeChain) sendTx(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error {
reader, err := c.Reader()
if err != nil {
return fmt.Errorf("chain unreachable: %w", err)
}

fromKey, err := solanago.PublicKeyFromBase58(from)
if err != nil {
return fmt.Errorf("failed to parse from key: %w", err)
}
toKey, err := solanago.PublicKeyFromBase58(to)
if err != nil {
return fmt.Errorf("failed to parse to key: %w", err)
}
if !amount.IsUint64() {
return fmt.Errorf("amount %s overflows uint64", amount)
}
amountI := amount.Uint64()

blockhash, err := reader.LatestBlockhash()
if err != nil {
return fmt.Errorf("failed to get latest block hash: %w", err)
}
tx, err := solanago.NewTransaction(
[]solanago.Instruction{
system.NewTransferInstruction(
amountI,
fromKey,
toKey,
).Build(),
},
blockhash.Value.Blockhash,
solanago.TransactionPayer(fromKey),
)
if err != nil {
return fmt.Errorf("failed to create tx: %w", err)
}

if balanceCheck {
if err = solanaValidateBalance(reader, fromKey, amountI, tx.Message.ToBase64()); err != nil {
return fmt.Errorf("failed to validate balance: %w", err)
}
}

txm := c.TxManager()
err = txm.Enqueue("", tx)
if err != nil {
return fmt.Errorf("transaction failed: %w", err)
}
return nil
}
3 changes: 2 additions & 1 deletion pkg/solana/chain_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package solana

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -174,7 +175,7 @@ func TestSolanaChain_VerifiedClient(t *testing.T) {
testChain.id = "incorrect"
c, err = testChain.verifiedClient(node)
assert.NoError(t, err)
_, err = c.ChainID()
_, err = c.ChainID(context.Background())
// expect error from id mismatch (even if using a cached client) when performing RPC calls
assert.Error(t, err)
assert.Equal(t, fmt.Sprintf("client returned mismatched chain id (expected: %s, got: %s): %s", "incorrect", "devnet", node.URL), err.Error())
Expand Down
8 changes: 4 additions & 4 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Reader interface {
Balance(addr solana.PublicKey) (uint64, error)
SlotHeight() (uint64, error)
LatestBlockhash() (*rpc.GetLatestBlockhashResult, error)
ChainID() (string, error)
ChainID(ctx context.Context) (StringID, error)
GetFeeForMessage(msg string) (uint64, error)
GetLatestBlock() (*rpc.GetBlockResult, error)
}
Expand Down Expand Up @@ -142,11 +142,11 @@ func (c *Client) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) {
return v.(*rpc.GetLatestBlockhashResult), err
}

func (c *Client) ChainID() (string, error) {
func (c *Client) ChainID(ctx context.Context) (StringID, error) {
done := c.latency("chain_id")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()
v, err, _ := c.requestGroup.Do("GetGenesisHash", func() (interface{}, error) {
return c.rpc.GetGenesisHash(ctx)
Expand All @@ -168,7 +168,7 @@ func (c *Client) ChainID() (string, error) {
c.log.Warnf("unknown genesis hash - assuming solana chain is 'localnet'")
network = "localnet"
}
return network, nil
return StringID(network), nil
}

func (c *Client) GetFeeForMessage(msg string) (uint64, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/solana/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestClient_Reader_Integration(t *testing.T) {
assert.Equal(t, uint64(5000), fee)

// get chain ID based on gensis hash
network, err := c.ChainID()
network, err := c.ChainID(context.Background())
assert.NoError(t, err)
assert.Equal(t, "localnet", network)

Expand Down Expand Up @@ -120,7 +120,7 @@ func TestClient_Reader_ChainID(t *testing.T) {

// get chain ID based on gensis hash
for _, n := range networks {
network, err := c.ChainID()
network, err := c.ChainID(context.Background())
assert.NoError(t, err)
assert.Equal(t, n, network)
}
Expand Down
Loading

0 comments on commit 60673aa

Please sign in to comment.