Skip to content

Commit

Permalink
Add chain multinode flag
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Sep 5, 2024
1 parent b8d6755 commit 2cb4d77
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 281 deletions.
91 changes: 85 additions & 6 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewChain(cfg *config.TOMLConfig, opts ChainOpts) (Chain, error) {
if !cfg.IsEnabled() {
return nil, fmt.Errorf("cannot create new chain with ID %s: chain is disabled", *cfg.ChainID)
}
c, err := newChain(*cfg.ChainID, cfg, opts.KeyStore, opts.Logger)
c, err := newChain(*cfg.ChainID, cfg, cfg.MultiNodeEnabled(), opts.KeyStore, opts.Logger)
if err != nil {
return nil, err
}
Expand All @@ -87,6 +87,11 @@ type chain struct {
balanceMonitor services.Service
lggr logger.Logger

// if multiNode is enabled, the clientCache will not be used
multiNodeEnabled bool
multiNode *mn.MultiNode[mn.StringID, *client.RPCClient]
txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.RPCClient]

// tracking node chain id for verification
clientCache map[string]*verifiedCachedClient // map URL -> {client, chainId} [mainnet/testnet/devnet/localnet]
clientLock sync.RWMutex
Expand Down Expand Up @@ -216,14 +221,70 @@ func (v *verifiedCachedClient) GetAccountInfoWithOpts(ctx context.Context, addr
return v.ReaderWriter.GetAccountInfoWithOpts(ctx, addr, opts)
}

func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.Logger) (*chain, error) {
func newChain(id string, cfg *config.TOMLConfig, multiNodeEnabled bool, ks loop.Keystore, lggr logger.Logger) (*chain, error) {
lggr = logger.With(lggr, "chainID", id, "chain", "solana")
var ch = chain{
id: id,
cfg: cfg,
lggr: logger.Named(lggr, "Chain"),
clientCache: map[string]*verifiedCachedClient{},
id: id,
cfg: cfg,
lggr: logger.Named(lggr, "Chain"),
multiNodeEnabled: multiNodeEnabled,
clientCache: map[string]*verifiedCachedClient{},
}

if multiNodeEnabled {
chainFamily := "solana"

mnCfg := cfg.MultiNodeConfig()

var nodes []mn.Node[mn.StringID, *client.RPCClient]

for i, nodeInfo := range cfg.ListNodes() {
// create client and check
rpcClient, err := client.NewRPCClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name))
if err != nil {
lggr.Warnw("failed to create client", "name", *nodeInfo.Name, "solana-url", nodeInfo.URL.String(), "err", err.Error())
continue
}

newNode := mn.NewNode[mn.StringID, *client.Head, *client.RPCClient](
mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name,
int32(i), mn.StringID(id), 0, rpcClient, chainFamily)

nodes = append(nodes, newNode)
}

multiNode := mn.NewMultiNode[mn.StringID, *client.RPCClient](
lggr,
mn.NodeSelectionModeRoundRobin,
time.Minute, // TODO: set lease duration
nodes,
[]mn.SendOnlyNode[mn.StringID, *client.RPCClient]{},
mn.StringID(id),
chainFamily,
mnCfg.DeathDeclarationDelay(),
)

// TODO: implement error classification
classifySendError := func(tx *solanago.Transaction, err error) mn.SendTxReturnCode {
return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false)
}

txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.RPCClient](
lggr,
mn.StringID(id),
chainFamily,
multiNode,
classifySendError,
0, // use the default value provided by the implementation
)

ch.multiNode = multiNode
ch.txSender = txSender

// clientCache will not be used if multinode is enabled
ch.clientCache = nil
}

tc := func() (client.ReaderWriter, error) {
return ch.getClient()
}
Expand Down Expand Up @@ -302,6 +363,10 @@ func (c *chain) ChainID() string {

// getClient returns a client, randomly selecting one from available and valid nodes
func (c *chain) getClient() (client.ReaderWriter, error) {
if c.multiNodeEnabled {
return c.multiNode.SelectRPC()
}

var node *config.Node
var client client.ReaderWriter
nodes := c.cfg.ListNodes()
Expand Down Expand Up @@ -381,6 +446,13 @@ func (c *chain) Start(ctx context.Context) error {
c.lggr.Debug("Starting txm")
c.lggr.Debug("Starting balance monitor")
var ms services.MultiStart
if c.multiNodeEnabled {
c.lggr.Debug("Starting multinode")
err := ms.Start(ctx, c.multiNode, c.txSender)
if err != nil {
return err
}
}
return ms.Start(ctx, c.txm, c.balanceMonitor)
})
}
Expand All @@ -390,6 +462,13 @@ func (c *chain) Close() error {
c.lggr.Debug("Stopping")
c.lggr.Debug("Stopping txm")
c.lggr.Debug("Stopping balance monitor")
if c.multiNodeEnabled {
c.lggr.Debug("Stopping multinode")
err := services.CloseAll(c.multiNode, c.txSender)
if err != nil {
return err
}
}
return services.CloseAll(c.txm, c.balanceMonitor)
})
}
Expand Down
Loading

0 comments on commit 2cb4d77

Please sign in to comment.