Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/zeta-chain/node into fix…
Browse files Browse the repository at this point in the history
…-solana-unsupported-tx-version-0
  • Loading branch information
ws4charlie committed Nov 25, 2024
2 parents f3f9458 + fffbcab commit 5539108
Show file tree
Hide file tree
Showing 73 changed files with 2,397 additions and 1,965 deletions.
8 changes: 5 additions & 3 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

## Unreleased

### Features

### Tests

### Refactor
* [3205](https://github.com/zeta-chain/node/issues/3205) - move Bitcoin revert address test to advanced group to avoid upgrade test failure

## Refactor

* [3170](https://github.com/zeta-chain/node/pull/3170) - revamp TSS package in zetaclient

### Fixes

Expand Down
3 changes: 2 additions & 1 deletion cmd/zetaclientd/initconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/zeta-chain/node/testutil/sample"
"github.com/zeta-chain/node/zetaclient/config"
zetatss "github.com/zeta-chain/node/zetaclient/tss"
)

// initializeConfigOptions is a set of CLI options for `init` command.
Expand Down Expand Up @@ -73,7 +74,7 @@ func InitializeConfig(_ *cobra.Command, _ []string) error {
// Validate Peer
// e.g. /ip4/172.0.2.1/tcp/6668/p2p/16Uiu2HAmACG5DtqmQsHtXg4G2sLS65ttv84e7MrL4kapkjfmhxAp
if opts.peer != "" {
if err := validatePeer(opts.peer); err != nil {
if _, err := zetatss.MultiAddressFromString(opts.peer); err != nil {
return errors.Wrap(err, "invalid peer address")
}
}
Expand Down
279 changes: 55 additions & 224 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,42 @@ package main

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
_ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

ecdsakeygen "github.com/bnb-chain/tss-lib/ecdsa/keygen"
"github.com/cometbft/cometbft/crypto/secp256k1"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
maddr "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"gitlab.com/thorchain/tss/go-tss/conversion"

"github.com/zeta-chain/node/pkg/authz"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
zetaos "github.com/zeta-chain/node/pkg/os"
"github.com/zeta-chain/node/pkg/ticker"
observerTypes "github.com/zeta-chain/node/x/observer/types"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/maintenance"
"github.com/zeta-chain/node/zetaclient/metrics"
"github.com/zeta-chain/node/zetaclient/orchestrator"
mc "github.com/zeta-chain/node/zetaclient/tss"
zetatss "github.com/zeta-chain/node/zetaclient/tss"
"github.com/zeta-chain/node/zetaclient/zetacore"
)

// todo revamp
// https://github.com/zeta-chain/node/issues/3119
// https://github.com/zeta-chain/node/issues/3112
var preParams *ecdsakeygen.LocalPreParams
const (
// enables posting blame data to core for failed TSS signatures
envFlagPostBlame = "POST_BLAME"
envPprofAddr = "PPROF_ADDR"
)

// Start starts zetaclientd process todo revamp
// https://github.com/zeta-chain/node/issues/3112
func Start(_ *cobra.Command, _ []string) error {
// Prompt for Hotkey, TSS key-share and relayer key passwords
titles := []string{"HotKey", "TSS", "Solana Relayer Key"}
Expand All @@ -69,13 +61,6 @@ func Start(_ *cobra.Command, _ []string) error {
return errors.Wrap(err, "initLogger failed")
}

// Wait until zetacore has started
if cfg.Peer != "" {
if err := validatePeer(cfg.Peer); err != nil {
return errors.Wrap(err, "unable to validate peer")
}
}

masterLogger := logger.Std
startLogger := logger.Std.With().Str("module", "startup").Logger()

Expand All @@ -95,6 +80,8 @@ func Start(_ *cobra.Command, _ []string) error {
}
}()

go runPprof(startLogger)

// CreateZetacoreClient: zetacore client is used for all communication to zetacore , which this client connects to.
// Zetacore accumulates votes , and provides a centralized source of truth for all clients
zetacoreClient, err := createZetacoreClient(cfg, hotkeyPass, masterLogger)
Expand Down Expand Up @@ -150,189 +137,42 @@ func Start(_ *cobra.Command, _ []string) error {

startLogger.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())

// Generate TSS address . The Tss address is generated through Keygen ceremony. The TSS key is used to sign all outbound transactions .
// The hotkeyPk is private key for the Hotkey. The Hotkey is used to sign all inbound transactions
// Each node processes a portion of the key stored in ~/.tss by default . Custom location can be specified in config file during init.
// After generating the key , the address is set on the zetacore
hotkeyPk, err := zetacoreClient.GetKeys().GetPrivateKey(hotkeyPass)
if err != nil {
startLogger.Error().Err(err).Msg("zetacore client GetPrivateKey error")
}
startLogger.Debug().Msgf("hotkeyPk %s", hotkeyPk.String())
if len(hotkeyPk.Bytes()) != 32 {
errMsg := fmt.Sprintf("key bytes len %d != 32", len(hotkeyPk.Bytes()))
log.Error().Msg(errMsg)
return errors.New(errMsg)
}
priKey := secp256k1.PrivKey(hotkeyPk.Bytes()[:32])

// Generate pre Params if not present already
peers, err := initPeers(cfg.Peer)
if err != nil {
log.Error().Err(err).Msg("peer address error")
}
initPreParams(cfg.PreParamsPath)

m, err := metrics.NewMetrics()
if err != nil {
log.Error().Err(err).Msg("NewMetrics")
return err
return errors.Wrap(err, "unable to create metrics")
}
m.Start()

metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

var tssHistoricalList []observerTypes.TSS
tssHistoricalList, err = zetacoreClient.GetTSSHistory(ctx)
telemetryServer.SetIPAddress(cfg.PublicIP)

granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, hotkeyPass)
if err != nil {
startLogger.Error().Err(err).Msg("GetTssHistory error")
return errors.Wrap(err, "unable to resolve observer pub key bech32")
}

telemetryServer.SetIPAddress(cfg.PublicIP)

keygen := appContext.GetKeygen()
whitelistedPeers := []peer.ID{}
for _, pk := range keygen.GranteePubkeys {
pid, err := conversion.Bech32PubkeyToPeerID(pk)
if err != nil {
return err
}
whitelistedPeers = append(whitelistedPeers, pid)
tssSetupProps := zetatss.SetupProps{
Config: cfg,
Zetacore: zetacoreClient,
GranteePubKeyBech32: granteePubKeyBech32,
HotKeyPassword: hotkeyPass,
TSSKeyPassword: tssKeyPass,
BitcoinChainIDs: btcChainIDsFromContext(appContext),
PostBlame: isEnvFlagEnabled(envFlagPostBlame),
Telemetry: telemetryServer,
}

// Create TSS server
tssServer, err := mc.SetupTSSServer(
peers,
priKey,
preParams,
appContext.Config(),
tssKeyPass,
true,
whitelistedPeers,
)
tss, err := zetatss.Setup(ctx, tssSetupProps, startLogger)
if err != nil {
return fmt.Errorf("SetupTSSServer error: %w", err)
return errors.Wrap(err, "unable to setup TSS service")
}

// Set P2P ID for telemetry
telemetryServer.SetP2PID(tssServer.GetLocalPeerID())

// Creating a channel to listen for os signals (or other signals)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

go func() {
for {
time.Sleep(30 * time.Second)
ps := tssServer.GetKnownPeers()
metrics.NumConnectedPeers.Set(float64(len(ps)))
telemetryServer.SetConnectedPeers(ps)
}
}()
go func() {
host := tssServer.GetP2PHost()
pingRTT := make(map[peer.ID]int64)
pingRTTLock := sync.Mutex{}
for {
var wg sync.WaitGroup
for _, p := range whitelistedPeers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
result := <-ping.Ping(ctx, host, p)
pingRTTLock.Lock()
defer pingRTTLock.Unlock()
if result.Error != nil {
masterLogger.Error().Err(result.Error).Msg("ping error")
pingRTT[p] = -1 // RTT -1 indicate ping error
return
}
pingRTT[p] = result.RTT.Nanoseconds()
}(p)
}
wg.Wait()
telemetryServer.SetPingRTT(pingRTT)
time.Sleep(30 * time.Second)
}
}()
// pprof http server
// zetacored/cometbft is already listening for pprof on 6060 (by default)
go func() {
// #nosec G114 -- timeouts uneeded
err := http.ListenAndServe("localhost:6061", nil)
if err != nil {
log.Error().Err(err).Msg("pprof http server error")
}
}()

// Generate a new TSS if keygen is set and add it into the tss server
// If TSS has already been generated, and keygen was successful ; we use the existing TSS
err = mc.Generate(ctx, zetacoreClient, tssServer, masterLogger)
if err != nil {
return err
}

tss, err := mc.New(
ctx,
zetacoreClient,
tssHistoricalList,
hotkeyPass,
tssServer,
)
if err != nil {
startLogger.Error().Err(err).Msg("NewTSS error")
return err
}
if cfg.TestTssKeysign {
err = mc.TestTSS(tss.CurrentPubkey, *tss.Server, masterLogger)
if err != nil {
startLogger.Error().Err(err).Msgf("TestTSS error : %s", tss.CurrentPubkey)
}
}

// Wait for TSS keygen to be successful before proceeding, This is a blocking thread only for a new keygen.
// For existing keygen, this should directly proceed to the next step
_ = ticker.Run(ctx, time.Second, func(ctx context.Context, t *ticker.Ticker) error {
keygen, err = zetacoreClient.GetKeyGen(ctx)
switch {
case err != nil:
startLogger.Warn().Err(err).Msg("Waiting for TSS Keygen to be a success, got error")
case keygen.Status != observerTypes.KeygenStatus_KeyGenSuccess:
startLogger.Warn().Msgf("Waiting for TSS Keygen to be a success, current status %s", keygen.Status)
default:
t.Stop()
}

return nil
})

// Update Current TSS value from zetacore, if TSS keygen is successful, the TSS address is set on zeta-core
// Returns err if the RPC call fails as zeta client needs the current TSS address to be set
// This is only needed in case of a new Keygen , as the TSS address is set on zetacore only after the keygen is successful i.e enough votes have been broadcast
currentTss, err := zetacoreClient.GetTSS(ctx)
if err != nil {
return errors.Wrap(err, "unable to get current TSS")
}

// Filter supported BTC chain IDs
btcChains := appContext.FilterChains(zctx.Chain.IsBitcoin)
btcChainIDs := make([]int64, len(btcChains))
for i, chain := range btcChains {
btcChainIDs[i] = chain.ID()
}

// Make sure the TSS EVM/BTC addresses are well formed.
// Zetaclient should not start if TSS addresses cannot be properly derived.
tss.CurrentPubkey = currentTss.TssPubkey
err = tss.ValidateAddresses(btcChainIDs)
if err != nil {
startLogger.Error().Err(err).Msg("TSS address validation failed")
return err
}

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
Expand Down Expand Up @@ -423,42 +263,6 @@ func Start(_ *cobra.Command, _ []string) error {
return nil
}

func initPeers(peer string) ([]maddr.Multiaddr, error) {
var peers []maddr.Multiaddr

if peer != "" {
address, err := maddr.NewMultiaddr(peer)
if err != nil {
log.Error().Err(err).Msg("NewMultiaddr error")
return []maddr.Multiaddr{}, err
}
peers = append(peers, address)
}
return peers, nil
}

func initPreParams(path string) {
if path != "" {
path = filepath.Clean(path)
log.Info().Msgf("pre-params file path %s", path)
preParamsFile, err := os.Open(path)
if err != nil {
log.Error().Err(err).Msg("open pre-params file failed; skip")
} else {
bz, err := io.ReadAll(preParamsFile)
if err != nil {
log.Error().Err(err).Msg("read pre-params file failed; skip")
} else {
err = json.Unmarshal(bz, &preParams)
if err != nil {
log.Error().Err(err).Msg("unmarshal pre-params file failed; skip and generate new one")
preParams = nil // skip reading pre-params; generate new one instead
}
}
}
}
}

// isObserverNode checks whether THIS node is an observer node.
func isObserverNode(ctx context.Context, client *zetacore.Client) (bool, error) {
observers, err := client.GetObserverList(ctx)
Expand All @@ -476,3 +280,30 @@ func isObserverNode(ctx context.Context, client *zetacore.Client) (bool, error)

return false, nil
}

func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) {
// Get observer's public key ("grantee pub key")
_, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword)
if err != nil {
return "", errors.Wrap(err, "unable to get keyring key base")
}

return granteePubKeyBech32, nil
}

// runPprof run pprof http server
// zetacored/cometbft is already listening for runPprof on 6060 (by default)
func runPprof(logger zerolog.Logger) {
addr := os.Getenv(envPprofAddr)
if addr == "" {
addr = "localhost:6061"
}

logger.Info().Str("addr", addr).Msg("starting pprof http server")

// #nosec G114 -- timeouts unneeded
err := http.ListenAndServe(addr, nil)
if err != nil {
logger.Error().Err(err).Msg("pprof http server error")
}
}
Loading

0 comments on commit 5539108

Please sign in to comment.