Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(zetaclient): streamline process initialization #3291

Merged
merged 9 commits into from
Dec 16, 2024
Merged
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
## Refactor

* [3170](https://github.com/zeta-chain/node/pull/3170) - revamp TSS package in zetaclient
* [3291](https://github.com/zeta-chain/node/pull/3291) - revamp zetaclient initialization (+ graceful shutdown)

### Fixes

Expand Down
274 changes: 95 additions & 179 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,18 @@ import (
"net/http"
_ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/zeta-chain/node/pkg/authz"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/graceful"
zetaos "github.com/zeta-chain/node/pkg/os"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/maintenance"
"github.com/zeta-chain/node/zetaclient/metrics"
"github.com/zeta-chain/node/zetaclient/orchestrator"
Expand All @@ -36,119 +30,55 @@ const (
envPprofAddr = "PPROF_ADDR"
)

// Start starts zetaclientd process todo revamp
// https://github.com/zeta-chain/node/issues/3112
// Start starts zetaclientd process
func Start(_ *cobra.Command, _ []string) error {
// Prompt for Hotkey, TSS key-share and relayer key passwords
titles := []string{"HotKey", "TSS", "Solana Relayer Key"}
passwords, err := zetaos.PromptPasswords(titles)
if err != nil {
return errors.Wrap(err, "unable to get passwords")
}
hotkeyPass, tssKeyPass, solanaKeyPass := passwords[0], passwords[1], passwords[2]
relayerKeyPasswords := map[string]string{
chains.Network_solana.String(): solanaKeyPass,
}

// Load Config file given path
cfg, err := config.Load(globalOpts.ZetacoreHome)
if err != nil {
return err
return errors.Wrap(err, "unable to load config")
}

logger, err := base.InitLogger(cfg)
dbPath, err := config.ResolveDBPath()
if err != nil {
return errors.Wrap(err, "initLogger failed")
return errors.Wrap(err, "unable to resolve db path")
}

masterLogger := logger.Std
startLogger := logger.Std.With().Str("module", "startup").Logger()

appContext := zctx.New(cfg, relayerKeyPasswords, masterLogger)
ctx := zctx.WithAppContext(context.Background(), appContext)

// Wait until zetacore is up
waitForZetaCore(cfg, startLogger)
startLogger.Info().Msgf("Zetacore is ready, trying to connect to %s", cfg.Peer)

telemetryServer := metrics.NewTelemetryServer()
go func() {
err := telemetryServer.Start()
if err != nil {
startLogger.Error().Err(err).Msg("telemetryServer error")
panic("telemetryServer error")
}
}()

go runPprof(startLogger)

// CreateZetacoreClient: zetacore client is used for all communication to zetacore , which this client connects to.
// Zetacore accumulates votes , and provides a centralized source of truth for all clients
zetacoreClient, err := createZetacoreClient(cfg, hotkeyPass, masterLogger)
// Configure logger (also overrides the default log level)
logger, err := base.NewLogger(cfg)
if err != nil {
return errors.Wrap(err, "unable to create zetacore client")
return errors.Wrap(err, "unable to create logger")
}

// Wait until zetacore is ready to create blocks
if err = waitForZetacoreToCreateBlocks(ctx, zetacoreClient, startLogger); err != nil {
startLogger.Error().Err(err).Msg("WaitForZetacoreToCreateBlocks error")
return err
}
startLogger.Info().Msgf("Zetacore client is ready")

// Set grantee account number and sequence number
err = zetacoreClient.SetAccountNumber(authz.ZetaClientGranteeKey)
passes, err := promptPasswords()
if err != nil {
startLogger.Error().Err(err).Msg("SetAccountNumber error")
return err
return errors.Wrap(err, "unable to prompt for passwords")
}

// cross-check chainid
res, err := zetacoreClient.GetNodeInfo(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetNodeInfo error")
return err
}
appContext := zctx.New(cfg, passes.relayerKeys(), logger.Std)
ctx := zctx.WithAppContext(context.Background(), appContext)

if strings.Compare(res.GetDefaultNodeInfo().Network, cfg.ChainID) != 0 {
startLogger.Warn().
Msgf("chain id mismatch, zetacore chain id %s, zetaclient configured chain id %s; reset zetaclient chain id", res.GetDefaultNodeInfo().Network, cfg.ChainID)
cfg.ChainID = res.GetDefaultNodeInfo().Network
err := zetacoreClient.UpdateChainID(cfg.ChainID)
if err != nil {
return err
}
telemetry, err := startTelemetry(ctx, cfg)
if err != nil {
return errors.Wrap(err, "unable to start telemetry")
}

// CreateAuthzSigner : which is used to sign all authz messages . All votes broadcast to zetacore are wrapped in authz exec .
// This is to ensure that the user does not need to keep their operator key online , and can use a cold key to sign votes
signerAddress, err := zetacoreClient.GetKeys().GetAddress()
// zetacore client is used for all communication to zeta node.
// it accumulates votes, and provides a source of truth for all clients
//
// This call crated client, ensured block production, then prepares the client
zetacoreClient, err := zetacore.NewFromConfig(ctx, &cfg, passes.hotkey, logger.Std)
if err != nil {
return errors.Wrap(err, "error getting signer address")
return errors.Wrap(err, "unable to create zetacore client from config")
}

createAuthzSigner(zetacoreClient.GetKeys().GetOperatorAddress().String(), signerAddress)
startLogger.Debug().Msgf("createAuthzSigner is ready")

// Initialize core parameters from zetacore
if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, startLogger); err != nil {
if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, logger.Std); err != nil {
return errors.Wrap(err, "unable to update app context")
}

startLogger.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())

m, err := metrics.NewMetrics()
if err != nil {
return errors.Wrap(err, "unable to create metrics")
}
m.Start()

metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

telemetryServer.SetIPAddress(cfg.PublicIP)
log.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())

granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, hotkeyPass)
granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, passes.hotkey)
if err != nil {
return errors.Wrap(err, "unable to resolve observer pub key bech32")
}
Expand All @@ -157,44 +87,34 @@ func Start(_ *cobra.Command, _ []string) error {
Config: cfg,
Zetacore: zetacoreClient,
GranteePubKeyBech32: granteePubKeyBech32,
HotKeyPassword: hotkeyPass,
TSSKeyPassword: tssKeyPass,
HotKeyPassword: passes.hotkey,
TSSKeyPassword: passes.tss,
BitcoinChainIDs: btcChainIDsFromContext(appContext),
PostBlame: isEnvFlagEnabled(envFlagPostBlame),
Telemetry: telemetryServer,
Telemetry: telemetry,
}

tss, err := zetatss.Setup(ctx, tssSetupProps, startLogger)
tss, err := zetatss.Setup(ctx, tssSetupProps, logger.Std)
if err != nil {
return errors.Wrap(err, "unable to setup TSS service")
}

// Creating a channel to listen for os signals (or other signals)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
})

if len(appContext.ListChainIDs()) == 0 {
startLogger.Error().Interface("config", cfg).Msgf("No chains in updated config")
}

isObserver, err := isObserverNode(ctx, zetacoreClient)
switch {
case err != nil:
startLogger.Error().Msgf("Unable to determine if node is an observer")
return err
return errors.Wrap(err, "unable to check if observer node")
case !isObserver:
addr := zetacoreClient.GetKeys().GetOperatorAddress().String()
startLogger.Info().Str("operator_address", addr).Msg("This node is not an observer. Exit 0")
logger.Std.Warn().Msg("This node is not an observer node. Exit 0")
return nil
}

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, logger.Std).Listen(ctx, func() {
logger.Std.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
graceful.ShutdownNow()
})

// CreateSignerMap: This creates a map of all signers for each chain.
// Each signer is responsible for signing transactions for a particular chain
signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger)
Expand All @@ -203,16 +123,9 @@ func Start(_ *cobra.Command, _ []string) error {
return err
}

userDir, err := os.UserHomeDir()
if err != nil {
log.Error().Err(err).Msg("os.UserHomeDir")
return err
}
dbpath := filepath.Join(userDir, ".zetaclient/chainobserver")

// Creates a map of all chain observers for each chain.
// Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer)
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbPath, logger, telemetry)
if err != nil {
return errors.Wrap(err, "unable to create chain observer map")
}
Expand All @@ -226,84 +139,87 @@ func Start(_ *cobra.Command, _ []string) error {
signerMap,
observerMap,
tss,
dbpath,
dbPath,
logger,
telemetryServer,
telemetry,
)
if err != nil {
return errors.Wrap(err, "unable to create orchestrator")
}

// Start orchestrator with all observers and signers
if err = maestro.Start(ctx); err != nil {
return errors.Wrap(err, "unable to start orchestrator")
}
graceful.AddService(ctx, maestro)

// start zeta supply checker
// TODO: enable
// https://github.com/zeta-chain/node/issues/1354
// NOTE: this is disabled for now because we need to determine the frequency on how to handle invalid check
// The method uses GRPC query to the node we might need to improve for performance
//zetaSupplyChecker, err := mc.NewZetaSupplyChecker(cfg, zetacoreClient, masterLogger)
//if err != nil {
// startLogger.Err(err).Msg("NewZetaSupplyChecker")
//}
//if err == nil {
// zetaSupplyChecker.Start()
// defer zetaSupplyChecker.Stop()
//}
// Block current routine until a shutdown signal is received
graceful.WaitForShutdown()

startLogger.Info().Msg("zetaclientd is running")
return nil
}

sig := <-signalChannel
startLogger.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig)
type passwords struct {
hotkey string
tss string
solanaRelayerKey string
}

maestro.Stop()
// promptPasswords prompts for Hotkey, TSS key-share and relayer key passwords
func promptPasswords() (passwords, error) {
titles := []string{"HotKey", "TSS", "Solana Relayer Key"}

return nil
res, err := zetaos.PromptPasswords(titles)
if err != nil {
return passwords{}, errors.Wrap(err, "unable to get passwords")
}

return passwords{
hotkey: res[0],
tss: res[1],
solanaRelayerKey: res[2],
}, nil
}

// isObserverNode checks whether THIS node is an observer node.
func isObserverNode(ctx context.Context, client *zetacore.Client) (bool, error) {
observers, err := client.GetObserverList(ctx)
if err != nil {
return false, errors.Wrap(err, "unable to get observers list")
func (p passwords) relayerKeys() map[string]string {
return map[string]string{
chains.Network_solana.String(): p.solanaRelayerKey,
}
}

operatorAddress := client.GetKeys().GetOperatorAddress().String()
func startTelemetry(ctx context.Context, cfg config.Config) (*metrics.TelemetryServer, error) {
// 1. Init pprof http server
pprofServer := func(_ context.Context) error {
addr := os.Getenv(envPprofAddr)
if addr == "" {
addr = "localhost:6061"
}

log.Info().Str("addr", addr).Msg("starting pprof http server")

for _, observer := range observers {
if observer == operatorAddress {
return true, nil
// #nosec G114 -- timeouts unneeded
err := http.ListenAndServe(addr, nil)
if err != nil {
log.Error().Err(err).Msg("pprof http server error")
}
}

return false, nil
}
return nil
}

func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) {
// Get observer's public key ("grantee pub key")
_, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword)
// 2. Init metrics server
metricsServer, err := metrics.NewMetrics()
if err != nil {
return "", errors.Wrap(err, "unable to get keyring key base")
return nil, errors.Wrap(err, "unable to create metrics")
}

return granteePubKeyBech32, nil
}
metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

// runPprof run pprof http server
// zetacored/cometbft is already listening for runPprof on 6060 (by default)
func runPprof(logger zerolog.Logger) {
addr := os.Getenv(envPprofAddr)
if addr == "" {
addr = "localhost:6061"
}
// 3. Init telemetry server
telemetry := metrics.NewTelemetryServer()
telemetry.SetIPAddress(cfg.PublicIP)

logger.Info().Str("addr", addr).Msg("starting pprof http server")
// 4. Add services to the process
graceful.AddStarter(ctx, pprofServer)
graceful.AddService(ctx, metricsServer)
graceful.AddService(ctx, telemetry)

// #nosec G114 -- timeouts unneeded
err := http.ListenAndServe(addr, nil)
if err != nil {
logger.Error().Err(err).Msg("pprof http server error")
}
return telemetry, nil
}
Loading
Loading