Skip to content

Commit

Permalink
feat: add zetaclientd-supervisor (#2113)
Browse files Browse the repository at this point in the history
* feat: add zetaclientd-supervisor

* pass through args directly

* clean shutdown

* Add optional autodownload

* remove old scripts

* workround being blocked by #2135

* keep restart-zetaclientd.sh in orchestrator for e2e

* lint fixes

* feedback updates

* autodownload optional and changelog

* feedback
  • Loading branch information
gartnera authored May 10, 2024
1 parent 7458093 commit 9014044
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 61 deletions.
6 changes: 6 additions & 0 deletions Dockerfile-upgrade
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ RUN --mount=type=cache,target="/root/.cache/go-build" cd /go/delivery/zeta-node/
RUN cp $GOPATH/bin/zetacored $GOPATH/bin/new/ && \
cp $GOPATH/bin/zetaclientd $GOPATH/bin/new/

COPY --from=oldbuild ${GOPATH}/bin/zetaclientd /root/.zetaclientd/upgrades/genesis/
RUN mkdir -p /root/.zetaclientd/upgrades/${NEW_VERSION}/ && \
cp ${GOPATH}/bin/zetaclientd /root/.zetaclientd/upgrades/${NEW_VERSION}/
RUN ln -s /root/.zetaclientd/upgrades/genesis /root/.zetaclientd/upgrades/current
ENV PATH="/root/.zetaclientd/upgrades/current:${PATH}"

COPY --from=oldbuild $GOPATH/bin/zetacored $GOPATH/bin/zetaclientd $GOPATH/bin/
COPY --from=oldbuild $GOPATH/bin/zetacored $GOPATH/bin/zetaclientd $GOPATH/bin/old

Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ install: go.sum
@echo "--> Installing zetacored & zetaclientd"
@go install -mod=readonly $(BUILD_FLAGS) ./cmd/zetacored
@go install -mod=readonly $(BUILD_FLAGS) ./cmd/zetaclientd
@go install -mod=readonly $(BUILD_FLAGS) ./cmd/zetaclientd-supervisor

install-zetaclient: go.sum
@echo "--> Installing zetaclientd"
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [2100](https://github.com/zeta-chain/node/pull/2100) - cosmos v0.47 upgrade
* [2145](https://github.com/zeta-chain/node/pull/2145) - add `ibc` and `ibc-transfer` modules
* [2135](https://github.com/zeta-chain/node/pull/2135) - add develop build version logic
* [2113](https://github.com/zeta-chain/node/pull/2113) - add zetaclientd-supervisor process

### Refactor

Expand Down
267 changes: 267 additions & 0 deletions cmd/zetaclientd-supervisor/lib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package main

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"runtime"
"strings"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/hashicorp/go-getter"
"github.com/rs/zerolog"
"github.com/zeta-chain/zetacore/zetaclient/config"
"google.golang.org/grpc"
)

const zetaclientdBinaryName = "zetaclientd"

var defaultUpgradesDir = os.ExpandEnv("$HOME/.zetaclientd/upgrades")

// serializedWriter wraps an io.Writer and ensures that writes to it from multiple goroutines
// are serialized
type serializedWriter struct {
upstream io.Writer
lock sync.Mutex
}

func (w *serializedWriter) Write(p []byte) (n int, err error) {
w.lock.Lock()
defer w.lock.Unlock()

return w.upstream.Write(p)
}

func getLogger(cfg config.Config, out io.Writer) zerolog.Logger {
var logger zerolog.Logger
switch cfg.LogFormat {
case "json":
logger = zerolog.New(out).Level(zerolog.Level(cfg.LogLevel)).With().Timestamp().Logger()
case "text":
logger = zerolog.New(zerolog.ConsoleWriter{Out: out, TimeFormat: time.RFC3339}).Level(zerolog.Level(cfg.LogLevel)).With().Timestamp().Logger()
default:
logger = zerolog.New(zerolog.ConsoleWriter{Out: out, TimeFormat: time.RFC3339})
}

return logger
}

type zetaclientdSupervisor struct {
zetacoredConn *grpc.ClientConn
reloadSignals chan bool
logger zerolog.Logger
upgradesDir string
upgradePlanName string
enableAutoDownload bool
}

func newZetaclientdSupervisor(zetaCoreURL string, logger zerolog.Logger, enableAutoDownload bool) (*zetaclientdSupervisor, error) {
logger = logger.With().Str("module", "zetaclientdSupervisor").Logger()
conn, err := grpc.Dial(
fmt.Sprintf("%s:9090", zetaCoreURL),
grpc.WithInsecure(),
)
if err != nil {
return nil, fmt.Errorf("grpc dial: %w", err)
}

return &zetaclientdSupervisor{
zetacoredConn: conn,
logger: logger,
reloadSignals: make(chan bool, 1),
upgradesDir: defaultUpgradesDir,
enableAutoDownload: enableAutoDownload,
}, nil
}

func (s *zetaclientdSupervisor) Start(ctx context.Context) {
go s.watchForVersionChanges(ctx)
go s.handleCoreUpgradePlan(ctx)
}

func (s *zetaclientdSupervisor) WaitForReloadSignal(ctx context.Context) {
select {
case <-s.reloadSignals:
case <-ctx.Done():
}
}

func (s *zetaclientdSupervisor) dirForVersion(version string) string {
return path.Join(s.upgradesDir, version)
}

func atomicSymlink(target, linkName string) error {
linkNameTmp := linkName + ".tmp"
_, err := os.Stat(target)
if err != nil {
return fmt.Errorf("stat target: %w", err)
}
err = os.Remove(linkNameTmp)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("remove old current tmp: %w", err)
}
err = os.Symlink(target, linkNameTmp)
if err != nil {
return fmt.Errorf("new symlink: %w", err)
}
err = os.Rename(linkNameTmp, linkName)
if err != nil {
return fmt.Errorf("rename symlink: %w", err)
}
return nil
}

func (s *zetaclientdSupervisor) watchForVersionChanges(ctx context.Context) {
client := tmservice.NewServiceClient(s.zetacoredConn)
prevVersion := ""
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
res, err := client.GetNodeInfo(ctx, &tmservice.GetNodeInfoRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("get node info")
continue
}
newVersion := res.ApplicationVersion.Version
if prevVersion == "" {
prevVersion = newVersion
}
if prevVersion == newVersion {
continue
}
s.logger.Warn().Msgf("core version change (%s -> %s)", prevVersion, newVersion)

prevVersion = newVersion

// TODO: just use newVersion when #2135 is merged
// even without #2135, the version will still change and trigger the update
newVersionDir := s.dirForVersion(s.upgradePlanName)
currentLinkPath := s.dirForVersion("current")

err = atomicSymlink(newVersionDir, currentLinkPath)
if err != nil {
s.logger.Error().Err(err).Msgf("unable to update current symlink (%s -> %s)", newVersionDir, currentLinkPath)
return
}
s.reloadSignals <- true
}
}

func (s *zetaclientdSupervisor) handleCoreUpgradePlan(ctx context.Context) {
client := upgradetypes.NewQueryClient(s.zetacoredConn)

prevPlanName := ""
for {
// wait for either a second or context cancel
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}

resp, err := client.CurrentPlan(ctx, &upgradetypes.QueryCurrentPlanRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("get current upgrade plan")
continue
}
if resp.Plan == nil {
continue
}
plan := resp.Plan
if prevPlanName == plan.Name {
continue
}
s.logger.Warn().Msgf("got new upgrade plan (%s)", plan.Name)
prevPlanName = plan.Name
s.upgradePlanName = plan.Name

if !s.enableAutoDownload {
s.logger.Warn().Msg("skipping autodownload because of configuration")
continue
}
err = s.downloadZetaclientd(ctx, plan)
if err != nil {
s.logger.Error().Err(err).Msg("downloadZetaclientd failed")
}
}
}

// UpgradeConfig is expected format for the info field to allow auto-download
// this structure is copied from cosmosvisor
type upgradeConfig struct {
Binaries map[string]string `json:"binaries"`
}

func (s *zetaclientdSupervisor) downloadZetaclientd(ctx context.Context, plan *upgradetypes.Plan) error {
if plan.Info == "" {
return errors.New("upgrade info empty")
}
var config upgradeConfig
err := json.Unmarshal([]byte(plan.Info), &config)
if err != nil {
return fmt.Errorf("unmarshal upgrade config: %w", err)
}

s.logger.Info().Msg("downloading zetaclientd")

binKey := fmt.Sprintf("%s-%s/%s", zetaclientdBinaryName, runtime.GOOS, runtime.GOARCH)
binURL, ok := config.Binaries[binKey]
if !ok {
return fmt.Errorf("no binary found for: %s", binKey)
}
upgradeDir := s.dirForVersion(plan.Name)
err = os.MkdirAll(upgradeDir, 0o750)
if err != nil {
return fmt.Errorf("mkdir %s: %w", upgradeDir, err)
}
upgradePath := path.Join(upgradeDir, zetaclientdBinaryName)
// TODO: retry?
// GetFile should validate checksum so long as it was provided in the url
err = getter.GetFile(upgradePath, binURL, getter.WithContext(ctx), getter.WithUmask(0o750))
if err != nil {
return fmt.Errorf("get file %s: %w", binURL, err)
}

// ensure binary is executable
info, err := os.Stat(upgradePath)
if err != nil {
return fmt.Errorf("stat binary: %w", err)
}
newMode := info.Mode().Perm() | 0o111
err = os.Chmod(upgradePath, newMode)
if err != nil {
return fmt.Errorf("chmod %s: %w", upgradePath, err)
}
return nil
}

func promptPasswords() (string, string, error) {
reader := bufio.NewReader(os.Stdin)
fmt.Print("HotKey Password: ")
hotKeyPass, err := reader.ReadString('\n')
if err != nil {
return "", "", err
}
fmt.Print("TSS Password: ")
tssKeyPass, err := reader.ReadString('\n')
if err != nil {
return "", "", err
}

//trim delimiters
hotKeyPass = strings.TrimSuffix(hotKeyPass, "\n")
tssKeyPass = strings.TrimSuffix(tssKeyPass, "\n")

return hotKeyPass, tssKeyPass, err
}
95 changes: 95 additions & 0 deletions cmd/zetaclientd-supervisor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"os/signal"
"syscall"
"time"

"github.com/zeta-chain/zetacore/app"
"github.com/zeta-chain/zetacore/zetaclient/config"
"golang.org/x/sync/errgroup"
)

func main() {
cfg, err := config.Load(app.DefaultNodeHome)
if err != nil {
panic(fmt.Errorf("failed to load config: %w", err))
}
// log outputs must be serialized since we are writing log messages in this process and
// also directly from the zetaclient process
serializedStdout := &serializedWriter{upstream: os.Stdout}
logger := getLogger(cfg, serializedStdout)
logger = logger.With().Str("process", "zetaclientd-supervisor").Logger()

ctx := context.Background()

// these signals will result in the supervisor process shutting down
shutdownChan := make(chan os.Signal, 1)
signal.Notify(shutdownChan, syscall.SIGINT, syscall.SIGTERM)

// these signals will result in the supervisor process only restarting zetaclientd
restartChan := make(chan os.Signal, 1)
signal.Notify(restartChan, syscall.SIGHUP)

hotkeyPassword, tssPassword, err := promptPasswords()
if err != nil {
panic(fmt.Errorf("unable to get passwords: %w", err))
}

_, enableAutoDownload := os.LookupEnv("ZETACLIENTD_SUPERVISOR_ENABLE_AUTO_DOWNLOAD")
supervisor, err := newZetaclientdSupervisor(cfg.ZetaCoreURL, logger, enableAutoDownload)
if err != nil {
panic(fmt.Errorf("unable to get supervisor: %w", err))
}
supervisor.Start(ctx)

shouldRestart := true
for shouldRestart {
ctx, cancel := context.WithCancel(ctx)
// pass args from supervisor directly to zetaclientd
cmd := exec.CommandContext(ctx, zetaclientdBinaryName, os.Args[1:]...) // #nosec G204
// by default, CommandContext sends SIGKILL. we want more graceful shutdown.
cmd.Cancel = func() error {
return cmd.Process.Signal(syscall.SIGINT)
}
cmd.Stdout = serializedStdout
cmd.Stderr = os.Stderr
// must reset the passwordInputBuffer every iteration because reads are stateful (seek to end)
passwordInputBuffer := bytes.Buffer{}
passwordInputBuffer.Write([]byte(hotkeyPassword + "\n" + tssPassword + "\n"))
cmd.Stdin = &passwordInputBuffer

eg, ctx := errgroup.WithContext(ctx)
eg.Go(cmd.Run)
eg.Go(func() error {
supervisor.WaitForReloadSignal(ctx)
cancel()
return nil
})
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case sig := <-restartChan:
logger.Info().Msgf("got signal %d, sending SIGINT to zetaclientd", sig)
case sig := <-shutdownChan:
logger.Info().Msgf("got signal %d, shutting down", sig)
shouldRestart = false
}
cancel()
}
})
err := eg.Wait()
if err != nil {
logger.Error().Err(err).Msg("error while waiting")
}
// prevent fast spin
time.Sleep(time.Second)
}
}
Loading

0 comments on commit 9014044

Please sign in to comment.