diff --git a/pkg/node/helper.go b/pkg/node/helper.go new file mode 100644 index 000000000..4e94f1e25 --- /dev/null +++ b/pkg/node/helper.go @@ -0,0 +1,213 @@ +// Copyright (C) 2022, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. +package node + +import ( + "encoding/json" + "fmt" + "github.com/ava-labs/avalanche-cli/pkg/application" + "github.com/ava-labs/avalanche-cli/pkg/constants" + "github.com/ava-labs/avalanche-cli/pkg/models" + "github.com/ava-labs/avalanche-cli/pkg/ssh" + "github.com/ava-labs/avalanche-cli/pkg/utils" + "github.com/ava-labs/avalanche-cli/pkg/ux" + "github.com/ava-labs/avalanchego/api/info" + "sync" +) + +func checkCluster(app *application.Avalanche, clusterName string) error { + _, err := getClusterNodes(app, clusterName) + return err +} + +func getClusterNodes(app *application.Avalanche, clusterName string) ([]string, error) { + if exists, err := checkClusterExists(app, clusterName); err != nil || !exists { + return nil, fmt.Errorf("cluster %q not found", clusterName) + } + clustersConfig, err := app.LoadClustersConfig() + if err != nil { + return nil, err + } + clusterNodes := clustersConfig.Clusters[clusterName].Nodes + if len(clusterNodes) == 0 { + return nil, fmt.Errorf("no nodes found in cluster %s", clusterName) + } + return clusterNodes, nil +} + +func checkClusterExists(app *application.Avalanche, clusterName string) (bool, error) { + clustersConfig := models.ClustersConfig{} + if app.ClustersConfigExists() { + var err error + clustersConfig, err = app.LoadClustersConfig() + if err != nil { + return false, err + } + } + _, ok := clustersConfig.Clusters[clusterName] + return ok, nil +} + +func checkHostsAreRPCCompatible(app *application.Avalanche, hosts []*models.Host, subnetName string) error { + incompatibleNodes, err := getRPCIncompatibleNodes(app, hosts, subnetName) + if err != nil { + return err + } + if len(incompatibleNodes) > 0 { + sc, err := app.LoadSidecar(subnetName) + if err != nil { + return err + } + ux.Logger.PrintToUser("Either modify your Avalanche Go version or modify your VM version") + ux.Logger.PrintToUser("To modify your Avalanche Go version: https://docs.avax.network/nodes/maintain/upgrade-your-avalanchego-node") + switch sc.VM { + case models.SubnetEvm: + ux.Logger.PrintToUser("To modify your Subnet-EVM version: https://docs.avax.network/build/subnet/upgrade/upgrade-subnet-vm") + case models.CustomVM: + ux.Logger.PrintToUser("To modify your Custom VM binary: avalanche subnet upgrade vm %s --config", subnetName) + } + ux.Logger.PrintToUser("Yoy can use \"avalanche node upgrade\" to upgrade Avalanche Go and/or Subnet-EVM to their latest versions") + return fmt.Errorf("the Avalanche Go version of node(s) %s is incompatible with VM RPC version of %s", incompatibleNodes, subnetName) + } + return nil +} + +func getRPCIncompatibleNodes(app *application.Avalanche, hosts []*models.Host, subnetName string) ([]string, error) { + ux.Logger.PrintToUser("Checking compatibility of node(s) avalanche go RPC protocol version with Subnet EVM RPC of subnet %s ...", subnetName) + sc, err := app.LoadSidecar(subnetName) + if err != nil { + return nil, err + } + wg := sync.WaitGroup{} + wgResults := models.NodeResults{} + for _, host := range hosts { + wg.Add(1) + go func(nodeResults *models.NodeResults, host *models.Host) { + defer wg.Done() + if resp, err := ssh.RunSSHCheckAvalancheGoVersion(host); err != nil { + nodeResults.AddResult(host.GetCloudID(), nil, err) + return + } else { + if _, rpcVersion, err := parseAvalancheGoOutput(resp); err != nil { + nodeResults.AddResult(host.GetCloudID(), nil, err) + } else { + nodeResults.AddResult(host.GetCloudID(), rpcVersion, err) + } + } + }(&wgResults, host) + } + wg.Wait() + if wgResults.HasErrors() { + return nil, fmt.Errorf("failed to get rpc protocol version for node(s) %s", wgResults.GetErrorHostMap()) + } + incompatibleNodes := []string{} + for nodeID, rpcVersionI := range wgResults.GetResultMap() { + rpcVersion := rpcVersionI.(uint32) + if rpcVersion != uint32(sc.RPCVersion) { + incompatibleNodes = append(incompatibleNodes, nodeID) + } + } + if len(incompatibleNodes) > 0 { + ux.Logger.PrintToUser(fmt.Sprintf("Compatible Avalanche Go RPC version is %d", sc.RPCVersion)) + } + return incompatibleNodes, nil +} + +func parseAvalancheGoOutput(byteValue []byte) (string, uint32, error) { + reply := map[string]interface{}{} + if err := json.Unmarshal(byteValue, &reply); err != nil { + return "", 0, err + } + resultMap := reply["result"] + resultJSON, err := json.Marshal(resultMap) + if err != nil { + return "", 0, err + } + + nodeVersionReply := info.GetNodeVersionReply{} + if err := json.Unmarshal(resultJSON, &nodeVersionReply); err != nil { + return "", 0, err + } + return nodeVersionReply.VMVersions["platform"], uint32(nodeVersionReply.RPCProtocolVersion), nil +} + +func disconnectHosts(hosts []*models.Host) { + for _, host := range hosts { + _ = host.Disconnect() + } +} + +func getWSEndpoint(endpoint string, blockchainID string) string { + return models.NewDevnetNetwork(endpoint, 0).BlockchainWSEndpoint(blockchainID) +} + +func getPublicEndpoints(app *application.Avalanche, clusterName string) ([]string, error) { + endpoints := []string{} + clusterConfig, err := app.GetClusterConfig(clusterName) + if err != nil { + return nil, err + } + publicNodes := clusterConfig.APINodes + if clusterConfig.Network.Kind == models.Devnet { + publicNodes = clusterConfig.Nodes + } + for _, cloudID := range publicNodes { + nodeConfig, err := app.LoadClusterNodeConfig(cloudID) + if err != nil { + return nil, err + } + endpoints = append(endpoints, getAvalancheGoEndpoint(nodeConfig.ElasticIP)) + } + return endpoints, nil +} + +func getRPCEndpoint(endpoint string, blockchainID string) string { + return models.NewDevnetNetwork(endpoint, 0).BlockchainEndpoint(blockchainID) +} + +func getAvalancheGoEndpoint(ip string) string { + return fmt.Sprintf("http://%s:%d", ip, constants.AvalanchegoAPIPort) +} + +func getUnhealthyNodes(hosts []*models.Host) ([]string, error) { + wg := sync.WaitGroup{} + wgResults := models.NodeResults{} + for _, host := range hosts { + wg.Add(1) + go func(nodeResults *models.NodeResults, host *models.Host) { + defer wg.Done() + if resp, err := ssh.RunSSHCheckHealthy(host); err != nil { + nodeResults.AddResult(host.GetCloudID(), nil, err) + return + } else { + if isHealthy, err := parseHealthyOutput(resp); err != nil { + nodeResults.AddResult(host.GetCloudID(), nil, err) + } else { + nodeResults.AddResult(host.GetCloudID(), isHealthy, err) + } + } + }(&wgResults, host) + } + wg.Wait() + if wgResults.HasErrors() { + return nil, fmt.Errorf("failed to get health status for node(s) %s", wgResults.GetErrorHostMap()) + } + return utils.Filter(wgResults.GetNodeList(), func(nodeID string) bool { + return !wgResults.GetResultMap()[nodeID].(bool) + }), nil +} + +func parseHealthyOutput(byteValue []byte) (bool, error) { + var result map[string]interface{} + if err := json.Unmarshal(byteValue, &result); err != nil { + return false, err + } + isHealthyInterface, ok := result["result"].(map[string]interface{}) + if ok { + isHealthy, ok := isHealthyInterface["healthy"].(bool) + if ok { + return isHealthy, nil + } + } + return false, fmt.Errorf("unable to parse node healthy status") +} diff --git a/pkg/node/sync.go b/pkg/node/sync.go new file mode 100644 index 000000000..9f5d609c1 --- /dev/null +++ b/pkg/node/sync.go @@ -0,0 +1,235 @@ +// Copyright (C) 2022, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. +package node + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/ava-labs/avalanche-cli/pkg/ansible" + "github.com/ava-labs/avalanche-cli/pkg/application" + "github.com/ava-labs/avalanche-cli/pkg/models" + "github.com/ava-labs/avalanche-cli/pkg/ssh" + "github.com/ava-labs/avalanche-cli/pkg/subnet" + "github.com/ava-labs/avalanche-cli/pkg/utils" + "github.com/ava-labs/avalanche-cli/pkg/ux" + "github.com/ava-labs/avalanchego/utils/set" + "sync" +) + +func SyncSubnet(app *application.Avalanche, clusterName, blockchainName string, avoidChecks bool, subnetAliases []string) error { + if err := checkCluster(app, clusterName); err != nil { + return err + } + clusterConfig, err := app.GetClusterConfig(clusterName) + if err != nil { + return err + } + if _, err := subnet.ValidateSubnetNameAndGetChains(app, []string{blockchainName}); err != nil { + return err + } + hosts, err := ansible.GetInventoryFromAnsibleInventoryFile(app.GetAnsibleInventoryDirPath(clusterName)) + if err != nil { + return err + } + defer disconnectHosts(hosts) + if !avoidChecks { + if err := checkHostsAreBootstrapped(hosts); err != nil { + return err + } + if err := checkHostsAreHealthy(hosts); err != nil { + return err + } + if err := checkHostsAreRPCCompatible(app, hosts, blockchainName); err != nil { + return err + } + } + if err := prepareSubnetPlugin(app, hosts, blockchainName); err != nil { + return err + } + if err := trackSubnet(app, hosts, clusterName, clusterConfig.Network, blockchainName, subnetAliases); err != nil { + return err + } + ux.Logger.PrintToUser("Node(s) successfully started syncing with Blockchain!") + ux.Logger.PrintToUser(fmt.Sprintf("Check node blockchain syncing status with avalanche node status %s --blockchain %s", clusterName, blockchainName)) + return nil +} + +// prepareSubnetPlugin creates subnet plugin to all nodes in the cluster +func prepareSubnetPlugin(app *application.Avalanche, hosts []*models.Host, blockchainName string) error { + sc, err := app.LoadSidecar(blockchainName) + if err != nil { + return err + } + wg := sync.WaitGroup{} + wgResults := models.NodeResults{} + for _, host := range hosts { + wg.Add(1) + go func(nodeResults *models.NodeResults, host *models.Host) { + defer wg.Done() + if err := ssh.RunSSHCreatePlugin(host, sc); err != nil { + nodeResults.AddResult(host.NodeID, nil, err) + } + }(&wgResults, host) + } + wg.Wait() + if wgResults.HasErrors() { + return fmt.Errorf("failed to upload plugin to node(s) %s", wgResults.GetErrorHostMap()) + } + return nil +} + +func trackSubnet( + app *application.Avalanche, + hosts []*models.Host, + clusterName string, + network models.Network, + blockchainName string, + subnetAliases []string, +) error { + // load cluster config + clusterConfig, err := app.GetClusterConfig(clusterName) + if err != nil { + return err + } + // and get list of subnets + allSubnets := utils.Unique(append(clusterConfig.Subnets, blockchainName)) + + // load sidecar to get subnet blockchain ID + sc, err := app.LoadSidecar(blockchainName) + if err != nil { + return err + } + blockchainID := sc.Networks[network.Name()].BlockchainID + + wg := sync.WaitGroup{} + wgResults := models.NodeResults{} + subnetAliases = append([]string{blockchainName}, subnetAliases...) + for _, host := range hosts { + wg.Add(1) + go func(nodeResults *models.NodeResults, host *models.Host) { + defer wg.Done() + if err := ssh.RunSSHStopNode(host); err != nil { + nodeResults.AddResult(host.NodeID, nil, err) + } + + if err := ssh.RunSSHRenderAvagoAliasConfigFile( + host, + blockchainID.String(), + subnetAliases, + ); err != nil { + nodeResults.AddResult(host.NodeID, nil, err) + } + if err := ssh.RunSSHRenderAvalancheNodeConfig( + app, + host, + network, + allSubnets, + clusterConfig.IsAPIHost(host.GetCloudID()), + ); err != nil { + nodeResults.AddResult(host.NodeID, nil, err) + } + if err := ssh.RunSSHSyncSubnetData(app, host, network, blockchainName); err != nil { + nodeResults.AddResult(host.NodeID, nil, err) + } + if err := ssh.RunSSHStartNode(host); err != nil { + nodeResults.AddResult(host.NodeID, nil, err) + return + } + }(&wgResults, host) + } + wg.Wait() + if wgResults.HasErrors() { + return fmt.Errorf("failed to track subnet for node(s) %s", wgResults.GetErrorHostMap()) + } + + // update slice of subnets synced by the cluster + clusterConfig.Subnets = allSubnets + err = app.SetClusterConfig(network.ClusterName, clusterConfig) + if err != nil { + return err + } + + // update slice of blockchain endpoints with the cluster ones + networkInfo := sc.Networks[clusterConfig.Network.Name()] + rpcEndpoints := set.Of(networkInfo.RPCEndpoints...) + wsEndpoints := set.Of(networkInfo.WSEndpoints...) + publicEndpoints, err := getPublicEndpoints(app, clusterName) + if err != nil { + return err + } + for _, publicEndpoint := range publicEndpoints { + rpcEndpoints.Add(getRPCEndpoint(publicEndpoint, networkInfo.BlockchainID.String())) + wsEndpoints.Add(getWSEndpoint(publicEndpoint, networkInfo.BlockchainID.String())) + } + networkInfo.RPCEndpoints = rpcEndpoints.List() + networkInfo.WSEndpoints = wsEndpoints.List() + sc.Networks[clusterConfig.Network.Name()] = networkInfo + return app.UpdateSidecar(&sc) +} + +func checkHostsAreBootstrapped(hosts []*models.Host) error { + notBootstrappedNodes, err := getNotBootstrappedNodes(hosts) + if err != nil { + return err + } + if len(notBootstrappedNodes) > 0 { + return fmt.Errorf("node(s) %s are not bootstrapped yet, please try again later", notBootstrappedNodes) + } + return nil +} + +func checkHostsAreHealthy(hosts []*models.Host) error { + ux.Logger.PrintToUser("Checking if node(s) are healthy...") + unhealthyNodes, err := getUnhealthyNodes(hosts) + if err != nil { + return err + } + if len(unhealthyNodes) > 0 { + return fmt.Errorf("node(s) %s are not healthy, please check the issue and try again later", unhealthyNodes) + } + return nil +} + +func getNotBootstrappedNodes(hosts []*models.Host) ([]string, error) { + wg := sync.WaitGroup{} + wgResults := models.NodeResults{} + for _, host := range hosts { + wg.Add(1) + go func(nodeResults *models.NodeResults, host *models.Host) { + defer wg.Done() + if resp, err := ssh.RunSSHCheckBootstrapped(host); err != nil { + nodeResults.AddResult(host.GetCloudID(), nil, err) + return + } else { + if isBootstrapped, err := parseBootstrappedOutput(resp); err != nil { + nodeResults.AddResult(host.GetCloudID(), nil, err) + } else { + nodeResults.AddResult(host.GetCloudID(), isBootstrapped, err) + } + } + }(&wgResults, host) + } + wg.Wait() + if wgResults.HasErrors() { + return nil, fmt.Errorf("failed to get avalanchego bootstrap status for node(s) %s", wgResults.GetErrorHostMap()) + } + return utils.Filter(wgResults.GetNodeList(), func(nodeID string) bool { + return !wgResults.GetResultMap()[nodeID].(bool) + }), nil +} + +func parseBootstrappedOutput(byteValue []byte) (bool, error) { + var result map[string]interface{} + if err := json.Unmarshal(byteValue, &result); err != nil { + return false, err + } + isBootstrappedInterface, ok := result["result"].(map[string]interface{}) + if ok { + isBootstrapped, ok := isBootstrappedInterface["isBootstrapped"].(bool) + if ok { + return isBootstrapped, nil + } + } + return false, errors.New("unable to parse node bootstrap status") +} diff --git a/pkg/subnet/helpers.go b/pkg/subnet/helpers.go index 49b1e0b80..9a55ea23a 100644 --- a/pkg/subnet/helpers.go +++ b/pkg/subnet/helpers.go @@ -3,10 +3,22 @@ package subnet import ( + "encoding/json" + "errors" + "fmt" "github.com/ava-labs/avalanche-cli/pkg/application" + "github.com/ava-labs/avalanche-cli/pkg/constants" "github.com/ava-labs/avalanche-cli/pkg/key" "github.com/ava-labs/avalanche-cli/pkg/models" "github.com/ava-labs/avalanche-cli/pkg/utils" + "os" + "path/filepath" + "unicode" +) + +var ( + errIllegalNameCharacter = errors.New( + "illegal name character: only letters, no special characters allowed") ) func GetDefaultSubnetAirdropKeyInfo(app *application.Avalanche, subnetName string) (string, string, string, error) { @@ -67,3 +79,66 @@ func GetSubnetAirdropKeyInfo( } return "", "", "", nil } + +func ValidateSubnetNameAndGetChains(app *application.Avalanche, args []string) ([]string, error) { + // this should not be necessary but some bright guy might just be creating + // the genesis by hand or something... + if err := checkInvalidSubnetNames(args[0]); err != nil { + return nil, fmt.Errorf("subnet name %s is invalid: %w", args[0], err) + } + // Check subnet exists + // TODO create a file that lists chains by subnet for fast querying + chains, err := getChainsInSubnet(app, args[0]) + if err != nil { + return nil, fmt.Errorf("failed to getChainsInSubnet: %w", err) + } + + if len(chains) == 0 { + return nil, errors.New("Invalid subnet " + args[0]) + } + + return chains, nil +} + +func checkInvalidSubnetNames(name string) error { + // this is currently exactly the same code as in avalanchego/vms/platformvm/create_chain_tx.go + for _, r := range name { + if r > unicode.MaxASCII || !(unicode.IsLetter(r) || unicode.IsNumber(r) || r == ' ') { + return errIllegalNameCharacter + } + } + return nil +} + +func getChainsInSubnet(app *application.Avalanche, blockchainName string) ([]string, error) { + subnets, err := os.ReadDir(app.GetSubnetDir()) + if err != nil { + return nil, fmt.Errorf("failed to read baseDir: %w", err) + } + + chains := []string{} + + for _, s := range subnets { + if !s.IsDir() { + continue + } + sidecarFile := filepath.Join(app.GetSubnetDir(), s.Name(), constants.SidecarFileName) + if _, err := os.Stat(sidecarFile); err == nil { + // read in sidecar file + jsonBytes, err := os.ReadFile(sidecarFile) + if err != nil { + return nil, fmt.Errorf("failed reading file %s: %w", sidecarFile, err) + } + + var sc models.Sidecar + err = json.Unmarshal(jsonBytes, &sc) + if err != nil { + return nil, fmt.Errorf("failed unmarshaling file %s: %w", sidecarFile, err) + } + if sc.Subnet == blockchainName { + chains = append(chains, sc.Name) + } + } + } + return chains, nil +}