From 4c90ca05a781637a2762c9ad6eab56d7759d9ad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Wed, 9 Aug 2023 19:50:53 +0200 Subject: [PATCH] *: update v0.17 branch with latest code for release (#2528) The following PRs have been cherry-picked: - https://github.com/ObolNetwork/charon/pull/2490 - https://github.com/ObolNetwork/charon/pull/2494 - https://github.com/ObolNetwork/charon/pull/2496 - https://github.com/ObolNetwork/charon/pull/2509 - https://github.com/ObolNetwork/charon/pull/2510 - https://github.com/ObolNetwork/charon/pull/2504 - https://github.com/ObolNetwork/charon/pull/2511 - https://github.com/ObolNetwork/charon/pull/2514 - https://github.com/ObolNetwork/charon/pull/2516 - https://github.com/ObolNetwork/charon/pull/2518 category: misc ticket: none --- .github/workflows/golangci-lint.yml | 2 +- .github/workflows/govulncheck.yml | 2 +- .github/workflows/nightly-tests.yml | 2 +- .github/workflows/pre-commit.yml | 2 +- .github/workflows/release.yml | 2 +- .github/workflows/test.yml | 33 ++++- .github/workflows/track-pr.yml | 2 +- .github/workflows/verify-pr.yml | 2 +- .golangci.yml | 4 +- Dockerfile | 2 +- app/app.go | 6 +- app/obolapi/api.go | 55 ++++++-- app/obolapi/api_test.go | 49 ++++++- cmd/createcluster.go | 30 ++++- core/validatorapi/router.go | 42 +++--- core/validatorapi/router_internal_test.go | 48 ++++++- dkg/bcast/client.go | 3 +- dkg/bcast/helpers.go | 3 + dkg/bcast/server.go | 2 + dkg/dkg.go | 122 ++++++++++++++---- dkg/dkg_test.go | 1 + dkg/dkgpb/v1/sync.pb.go | 34 +++-- dkg/dkgpb/v1/sync.proto | 1 + dkg/exchanger.go | 117 ++++++++++++----- dkg/exchanger_internal_test.go | 67 +++++++++- dkg/sync/client.go | 39 +++++- dkg/sync/server.go | 59 +++++++++ dkg/sync/sync_test.go | 23 +++- p2p/receive.go | 5 +- p2p/sender.go | 34 ++++- p2p/sender_test.go | 37 ++++++ testutil/compose/compose/main.go | 6 +- testutil/compose/config.go | 8 +- testutil/compose/fuzz/beacon_fuzz_test.go | 60 --------- testutil/compose/fuzz/fuzz_test.go | 86 ++++++++++++ testutil/compose/lock.go | 2 +- testutil/compose/run.go | 8 +- .../testdata/TestNewDefaultConfig.golden | 3 +- testutil/promrated/Dockerfile | 2 +- 39 files changed, 776 insertions(+), 229 deletions(-) delete mode 100644 testutil/compose/fuzz/beacon_fuzz_test.go create mode 100644 testutil/compose/fuzz/fuzz_test.go diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index dc622e4c7..5203ed124 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -16,7 +16,7 @@ jobs: # Config options can be found in README here: https://github.com/golangci/golangci-lint-action - uses: actions/setup-go@v4 with: - go-version: '1.20.6' + go-version: '1.20.7' - uses: actions/checkout@v3 - name: golangci-lint uses: golangci/golangci-lint-action@v3 diff --git a/.github/workflows/govulncheck.yml b/.github/workflows/govulncheck.yml index 173451ced..321a03812 100644 --- a/.github/workflows/govulncheck.yml +++ b/.github/workflows/govulncheck.yml @@ -14,6 +14,6 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: '1.20.6' + go-version: '1.20.7' - run: go install golang.org/x/vuln/cmd/govulncheck@latest - run: govulncheck -show=stacks -test ./... diff --git a/.github/workflows/nightly-tests.yml b/.github/workflows/nightly-tests.yml index e039ee45c..1e20103ec 100644 --- a/.github/workflows/nightly-tests.yml +++ b/.github/workflows/nightly-tests.yml @@ -11,7 +11,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: '1.20.6' + go-version: '1.20.7' - uses: actions/cache@v3 with: path: | diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index b532a8b08..d86018fe2 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -16,7 +16,7 @@ jobs: - uses: actions/setup-python@v2 - uses: actions/setup-go@v4 with: - go-version: '1.20.6' + go-version: '1.20.7' - uses: pre-commit/action@v2.0.3 - name: notify failure diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 1565477e4..2a6a38914 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -15,7 +15,7 @@ jobs: fetch-depth: 0 # Disable shallow checkout - uses: actions/setup-go@v4 with: - go-version: '1.20.6' + go-version: '1.20.7' - run: go run . --help > cli-reference.txt - run: go run testutil/genchangelog/main.go - uses: softprops/action-gh-release@v1 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4aba8cfa5..70eeafcb7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,7 +11,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: '1.20.6' + go-version: '1.20.7' - uses: actions/cache@v3 with: path: | @@ -33,7 +33,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: '1.20.6' + go-version: '1.20.7' - uses: actions/cache@v3 with: path: | @@ -51,7 +51,7 @@ jobs: - uses: docker/setup-buildx-action@v2 # For compose to build images - uses: actions/setup-go@v4 with: - go-version: '1.20.6' + go-version: '1.20.7' - uses: actions/cache@v3 with: path: | @@ -71,6 +71,33 @@ jobs: path: testutil/compose/smoke/*.log retention-days: 3 + fuzz_tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: docker/setup-buildx-action@v2 # For compose to build images + - uses: actions/setup-go@v4 + with: + go-version: '1.20.7' + - uses: actions/cache@v3 + with: + path: | + ~/go/pkg/mod + ~/.cache/go-build + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go- + - run: | + echo "CHARON_REPO=$(pwd)" >> $GITHUB_ENV + echo "DOCKER_BUILDKIT=1" >> $GITHUB_ENV + - run: go test -race github.com/obolnetwork/charon/testutil/compose/fuzz -v -fuzzer -sudo-perms -timeout=45m -log-dir=. + - uses: actions/upload-artifact@v3 + if: always() + with: + name: fuzz-test-logs + path: testutil/compose/fuzz/*.log + retention-days: 3 + notify_failure: runs-on: ubuntu-latest needs: [ unit_tests, integration_tests, compose_tests ] diff --git a/.github/workflows/track-pr.yml b/.github/workflows/track-pr.yml index edfb0ff1f..0f06b76e2 100644 --- a/.github/workflows/track-pr.yml +++ b/.github/workflows/track-pr.yml @@ -14,7 +14,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: '1.20.6' + go-version: '1.20.7' - name: "Track PR" run: go run github.com/obolnetwork/charon/testutil/trackpr diff --git a/.github/workflows/verify-pr.yml b/.github/workflows/verify-pr.yml index 80678e9ac..328669e50 100644 --- a/.github/workflows/verify-pr.yml +++ b/.github/workflows/verify-pr.yml @@ -12,7 +12,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: '1.20.6' + go-version: '1.20.7' - name: "Verify PR" run: go run github.com/obolnetwork/charon/testutil/verifypr diff --git a/.golangci.yml b/.golangci.yml index 71c6bb3b8..911e10ba2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,6 @@ run: timeout: 5m - go: "1.20.6" + go: "1.20.7" linters-settings: cyclop: max-complexity: 15 @@ -89,7 +89,7 @@ linters-settings: - "github.com/gogo/protobuf/proto" # Prefer google.golang.org/protobuf - "github.com/prometheus/client_golang/prometheus/promauto" # Prefer ./app/promauto staticcheck: - go: "1.20.6" + go: "1.20.7" checks: - "all" - "-SA1019" # Ignoring since github.com/drand/kyber/sign/bls uses Proof Of Possession as does Ethereum. diff --git a/Dockerfile b/Dockerfile index 3578c6337..474441e7f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Container for building Go binary. -FROM golang:1.20.6-bullseye AS builder +FROM golang:1.20.7-bullseye AS builder # Install dependencies RUN apt-get update && apt-get install -y build-essential git # Prep and copy source diff --git a/app/app.go b/app/app.go index 5f45823db..7ee1ba71d 100644 --- a/app/app.go +++ b/app/app.go @@ -427,7 +427,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, return err } - if err := wireVAPIRouter(life, conf.ValidatorAPIAddr, eth2Cl, vapi, vapiCalls); err != nil { + if err := wireVAPIRouter(ctx, life, conf.ValidatorAPIAddr, eth2Cl, vapi, vapiCalls); err != nil { return err } @@ -889,10 +889,10 @@ func createMockValidators(pubkeys []eth2p0.BLSPubKey) beaconmock.ValidatorSet { } // wireVAPIRouter constructs the validator API router and registers it with the life cycle manager. -func wireVAPIRouter(life *lifecycle.Manager, vapiAddr string, eth2Cl eth2wrap.Client, +func wireVAPIRouter(ctx context.Context, life *lifecycle.Manager, vapiAddr string, eth2Cl eth2wrap.Client, handler validatorapi.Handler, vapiCalls func(), ) error { - vrouter, err := validatorapi.NewRouter(handler, eth2Cl) + vrouter, err := validatorapi.NewRouter(ctx, handler, eth2Cl) if err != nil { return errors.Wrap(err, "new monitoring server") } diff --git a/app/obolapi/api.go b/app/obolapi/api.go index 40c88608b..02ef3d18e 100644 --- a/app/obolapi/api.go +++ b/app/obolapi/api.go @@ -5,6 +5,7 @@ package obolapi import ( "bytes" "context" + "fmt" "io" "net/http" "net/url" @@ -15,11 +16,21 @@ import ( "github.com/obolnetwork/charon/cluster" ) +const ( + // launchpadReturnPathFmt is the URL path format string at which one can find details for a given cluster lock hash. + launchpadReturnPathFmt = "/lock/0x%X/launchpad" +) + // New returns a new Client. -func New(url string) Client { - return Client{ - baseURL: url, +func New(urlStr string) (Client, error) { + _, err := url.ParseRequestURI(urlStr) // check that urlStr is valid + if err != nil { + return Client{}, errors.Wrap(err, "could not parse Obol API URL") } + + return Client{ + baseURL: urlStr, + }, nil } // Client is the REST client for obol-api requests. @@ -27,27 +38,31 @@ type Client struct { baseURL string // Base obol-api URL } +// url returns a *url.URL from the baseURL stored in c. +// Will panic if somehow c.baseURL got corrupted, and it's not a valid URL anymore. +func (c Client) url() *url.URL { + baseURL, err := url.ParseRequestURI(c.baseURL) + if err != nil { + panic(errors.Wrap(err, "could not parse Obol API URL, this should never happen")) + } + + return baseURL +} + // PublishLock posts the lockfile to obol-api. func (c Client) PublishLock(ctx context.Context, lock cluster.Lock) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - addr, err := url.JoinPath(c.baseURL, "lock") - if err != nil { - return errors.Wrap(err, "invalid address") - } - - url, err := url.Parse(addr) - if err != nil { - return errors.Wrap(err, "invalid endpoint") - } + addr := c.url() + addr.Path = "lock" b, err := lock.MarshalJSON() if err != nil { return errors.Wrap(err, "marshal lock") } - err = httpPost(ctx, url, b) + err = httpPost(ctx, addr, b) if err != nil { return err } @@ -55,6 +70,20 @@ func (c Client) PublishLock(ctx context.Context, lock cluster.Lock) error { return nil } +// LaunchpadURLForLock returns the Launchpad cluster dashboard page for a given lock, on the given +// Obol API client. +func (c Client) LaunchpadURLForLock(lock cluster.Lock) string { + lURL := c.url() + + lURL.Path = launchpadURLPath(lock) + + return lURL.String() +} + +func launchpadURLPath(lock cluster.Lock) string { + return fmt.Sprintf(launchpadReturnPathFmt, lock.LockHash) +} + func httpPost(ctx context.Context, url *url.URL, b []byte) error { req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewReader(b)) if err != nil { diff --git a/app/obolapi/api_test.go b/app/obolapi/api_test.go index 2e11be9b6..275eb8571 100644 --- a/app/obolapi/api_test.go +++ b/app/obolapi/api_test.go @@ -3,11 +3,13 @@ package obolapi_test import ( + "bytes" "context" "encoding/json" "io" "net/http" "net/http/httptest" + "net/url" "testing" "github.com/stretchr/testify/require" @@ -45,8 +47,51 @@ func TestLockPublish(t *testing.T) { lock, _, _ := cluster.NewForT(t, 3, 3, 4, 0, opts...) - cl := obolapi.New(srv.URL) - err := cl.PublishLock(ctx, lock) + cl, err := obolapi.New(srv.URL) require.NoError(t, err) + err = cl.PublishLock(ctx, lock) + require.NoError(t, err) + }) +} + +func TestURLParsing(t *testing.T) { + t.Run("invalid url", func(t *testing.T) { + cl, err := obolapi.New("badURL") + require.Error(t, err) + require.Empty(t, cl) + }) + + t.Run("http url", func(t *testing.T) { + cl, err := obolapi.New("http://unsafe.today") + require.NoError(t, err) + require.NotEmpty(t, cl) + }) + + t.Run("https url", func(t *testing.T) { + cl, err := obolapi.New("https://safe.today") + require.NoError(t, err) + require.NotEmpty(t, cl) + }) +} + +func TestLaunchpadDashURL(t *testing.T) { + t.Run("produced url is what we expect", func(t *testing.T) { + cl, err := obolapi.New("https://safe.today") + require.NoError(t, err) + require.NotEmpty(t, cl) + + result := cl.LaunchpadURLForLock(cluster.Lock{LockHash: bytes.Repeat([]byte{0x42}, 32)}) + + require.NotEmpty(t, result) + + parsedRes, err := url.ParseRequestURI(result) + require.NoError(t, err) + + require.Equal(t, "safe.today", parsedRes.Host) + require.Equal( + t, + "/lock/0x4242424242424242424242424242424242424242424242424242424242424242/launchpad", + parsedRes.Path, + ) }) } diff --git a/cmd/createcluster.go b/cmd/createcluster.go index 950e2d50e..10f52a57f 100644 --- a/cmd/createcluster.go +++ b/cmd/createcluster.go @@ -257,9 +257,14 @@ func runCreateCluster(ctx context.Context, w io.Writer, conf clusterConfig) erro lock.NodeSignatures = append(lock.NodeSignatures, nodeSig) } + // dashboardURL is the Launchpad dashboard url for a given lock file. + // If empty, either conf.Publish wasn't specified or there was a processing error in publishing + // the generated lock file. + var dashboardURL string + // Write cluster-lock file if conf.Publish { - if err = writeLockToAPI(ctx, conf.PublishAddr, lock); err != nil { + if dashboardURL, err = writeLockToAPI(ctx, conf.PublishAddr, lock); err != nil { log.Warn(ctx, "Couldn't publish lock file to Obol API", err) } } @@ -272,7 +277,15 @@ func runCreateCluster(ctx context.Context, w io.Writer, conf clusterConfig) erro writeWarning(w) } - return writeOutput(w, conf.SplitKeys, conf.ClusterDir, numNodes, keysToDisk) + if err := writeOutput(w, conf.SplitKeys, conf.ClusterDir, numNodes, keysToDisk); err != nil { + return err + } + + if dashboardURL != "" { + log.Info(ctx, fmt.Sprintf("You can find your newly-created cluster dashboard here: %s", dashboardURL)) + } + + return nil } // validateCreateConfig returns an error if any of the provided config parameters are invalid. @@ -943,17 +956,20 @@ func randomHex64() (string, error) { return hex.EncodeToString(b), nil } -// writeLockToAPI posts the lock file to obol-api. -func writeLockToAPI(ctx context.Context, publishAddr string, lock cluster.Lock) error { - cl := obolapi.New(publishAddr) +// writeLockToAPI posts the lock file to obol-api and returns the Launchpad dashboard URL. +func writeLockToAPI(ctx context.Context, publishAddr string, lock cluster.Lock) (string, error) { + cl, err := obolapi.New(publishAddr) + if err != nil { + return "", err + } if err := cl.PublishLock(ctx, lock); err != nil { - return err + return "", err } log.Info(ctx, "Published lock file", z.Str("addr", publishAddr)) - return nil + return cl.LaunchpadURLForLock(lock), nil } // validateAddresses checks if we have sufficient addresses. It also fills addresses slices if only one is provided. diff --git a/core/validatorapi/router.go b/core/validatorapi/router.go index 37f4c1819..273473c58 100644 --- a/core/validatorapi/router.go +++ b/core/validatorapi/router.go @@ -80,7 +80,7 @@ type Handler interface { // NewRouter returns a new validator http server router. The http router // translates http requests related to the distributed validator to the Handler. // All other requests are reverse-proxied to the beacon-node address. -func NewRouter(h Handler, eth2Cl eth2wrap.Client) (*mux.Router, error) { +func NewRouter(ctx context.Context, h Handler, eth2Cl eth2wrap.Client) (*mux.Router, error) { // Register subset of distributed validator related endpoints. endpoints := []struct { Name string @@ -215,7 +215,7 @@ func NewRouter(h Handler, eth2Cl eth2wrap.Client) (*mux.Router, error) { } // Everything else is proxied - r.PathPrefix("/").Handler(proxyHandler(eth2Cl)) + r.PathPrefix("/").Handler(proxyHandler(ctx, eth2Cl)) return r, nil } @@ -880,11 +880,17 @@ func nodeVersion(p eth2client.NodeVersionProvider) handlerFunc { } } +// addressProvider provides the address of the active beacon node. +type addressProvider interface { + Address() string +} + // proxyHandler returns a reverse proxy handler. -func proxyHandler(eth2Cl eth2wrap.Client) http.HandlerFunc { +// Proxied requests use the provided context, so are cancelled when the context is cancelled. +func proxyHandler(ctx context.Context, addrProvider addressProvider) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // Get active beacon node address. - targetURL, err := getBeaconNodeAddress(r.Context(), eth2Cl) + targetURL, err := getBeaconNodeAddress(addrProvider) if err != nil { ctx := log.WithTopic(r.Context(), "vapi") log.Error(ctx, "Proxy target beacon node address", err) @@ -894,7 +900,6 @@ func proxyHandler(eth2Cl eth2wrap.Client) http.HandlerFunc { } // Get address for active beacon node proxy := httputil.NewSingleHostReverseProxy(targetURL) - // Extend default proxy director with basic auth and host header. defaultDirector := proxy.Director proxy.Director = func(req *http.Request) { @@ -907,31 +912,18 @@ func proxyHandler(eth2Cl eth2wrap.Client) http.HandlerFunc { } proxy.ErrorLog = stdlog.New(io.Discard, "", 0) + // Use provided context for proxied requests, so long running + // requests are cancelled when this context is cancelled (soft shutdown). + clonedReq := r.Clone(ctx) + defer observeAPILatency("proxy")() - proxy.ServeHTTP(proxyResponseWriter{w.(writeFlusher)}, r) + proxy.ServeHTTP(proxyResponseWriter{w.(writeFlusher)}, clonedReq) } } // getBeaconNodeAddress returns an active beacon node proxy target address. -func getBeaconNodeAddress(ctx context.Context, eth2Cl eth2wrap.Client) (*url.URL, error) { - addr := eth2Cl.Address() - if addr == "none" { - // Trigger refresh of inactive clients to hopefully resolve any active clients. - syncProvider, ok := eth2Cl.(eth2client.NodeSyncingProvider) - if !ok { - return nil, errors.New("invalid eth2 client") - } - _, err := syncProvider.NodeSyncing(ctx) - if err != nil { - return nil, errors.New("no active beacon nodes") // Not wrapping since error will be confusing. - } - - addr = eth2Cl.Address() - if addr == "none" { - return nil, errors.New("no active beacon nodes") - } - } - +func getBeaconNodeAddress(addrProvider addressProvider) (*url.URL, error) { + addr := addrProvider.Address() targetURL, err := url.Parse(addr) if err != nil { return nil, errors.Wrap(err, "invalid beacon node address", z.Str("address", addr)) diff --git a/core/validatorapi/router_internal_test.go b/core/validatorapi/router_internal_test.go index 87d0dd67e..e8a497030 100644 --- a/core/validatorapi/router_internal_test.go +++ b/core/validatorapi/router_internal_test.go @@ -44,13 +44,47 @@ const ( infoLevel = 1 // 1 is InfoLevel, this avoids importing zerolog directly. ) +type addr string + +func (a addr) Address() string { + return string(a) +} + +func TestProxyShutdown(t *testing.T) { + // Start a server that will block until the request is cancelled. + serving := make(chan struct{}) + target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + close(serving) + <-r.Context().Done() + })) + + // Start a proxy server that will proxy to the target server. + ctx, cancel := context.WithCancel(context.Background()) + proxy := httptest.NewServer(proxyHandler(ctx, addr(target.URL))) + + // Make a request to the proxy server, this will block until the proxy is shutdown. + done := make(chan struct{}) + go func() { + _, err := http.Get(proxy.URL) + require.NoError(t, err) + close(done) + }() + + // Wait for the target server is serving the request. + <-serving + // Shutdown the proxy server. + cancel() + // Wait for the request to complete. + <-done +} + func TestRouterIntegration(t *testing.T) { beaconURL, ok := os.LookupEnv("BEACON_URL") if !ok { t.Skip("Skipping integration test since BEACON_URL not found") } - r, err := NewRouter(Handler(nil), testBeaconAddr{addr: beaconURL}) + r, err := NewRouter(context.Background(), Handler(nil), testBeaconAddr{addr: beaconURL}) require.NoError(t, err) server := httptest.NewServer(r) @@ -1031,7 +1065,7 @@ func TestBeaconCommitteeSelections(t *testing.T) { proxy := httptest.NewServer(handler.newBeaconHandler(t)) defer proxy.Close() - r, err := NewRouter(handler, testBeaconAddr{addr: proxy.URL}) + r, err := NewRouter(ctx, handler, testBeaconAddr{addr: proxy.URL}) require.NoError(t, err) server := httptest.NewServer(r) @@ -1093,7 +1127,7 @@ func TestSubmitAggregateAttestations(t *testing.T) { proxy := httptest.NewServer(handler.newBeaconHandler(t)) defer proxy.Close() - r, err := NewRouter(handler, testBeaconAddr{addr: proxy.URL}) + r, err := NewRouter(ctx, handler, testBeaconAddr{addr: proxy.URL}) require.NoError(t, err) server := httptest.NewServer(r) @@ -1118,14 +1152,14 @@ func testRouter(t *testing.T, handler testHandler, callback func(context.Context proxy := httptest.NewServer(handler.newBeaconHandler(t)) defer proxy.Close() - r, err := NewRouter(handler, testBeaconAddr{addr: proxy.URL}) + ctx := context.Background() + + r, err := NewRouter(ctx, handler, testBeaconAddr{addr: proxy.URL}) require.NoError(t, err) server := httptest.NewServer(r) defer server.Close() - ctx := context.Background() - cl, err := eth2http.New(ctx, eth2http.WithAddress(server.URL), eth2http.WithLogLevel(infoLevel)) require.NoError(t, err) @@ -1139,7 +1173,7 @@ func testRawRouter(t *testing.T, handler testHandler, callback func(context.Cont proxy := httptest.NewServer(handler.newBeaconHandler(t)) defer proxy.Close() - r, err := NewRouter(handler, testBeaconAddr{addr: proxy.URL}) + r, err := NewRouter(context.Background(), handler, testBeaconAddr{addr: proxy.URL}) require.NoError(t, err) server := httptest.NewServer(r) diff --git a/dkg/bcast/client.go b/dkg/bcast/client.go index 853648995..94141ed10 100644 --- a/dkg/bcast/client.go +++ b/dkg/bcast/client.go @@ -133,7 +133,8 @@ func (c *client) Broadcast(ctx context.Context, msgID string, msg proto.Message) continue // Skip self. } - err := c.sendFunc(ctx, c.tcpNode, protocolIDMsg, pID, bcastMsg, p2p.WithDelimitedProtocol(protocolIDMsg)) + err := c.sendFunc(ctx, c.tcpNode, protocolIDMsg, pID, bcastMsg, + p2p.WithDelimitedProtocol(protocolIDMsg), p2p.WithSendTimeout(sendTimeout)) if err != nil { return errors.Wrap(err, "send message") } diff --git a/dkg/bcast/helpers.go b/dkg/bcast/helpers.go index 9020b7aed..6c08a5b1f 100644 --- a/dkg/bcast/helpers.go +++ b/dkg/bcast/helpers.go @@ -4,6 +4,7 @@ package bcast import ( "context" + "time" "github.com/libp2p/go-libp2p/core/peer" "google.golang.org/protobuf/proto" @@ -14,6 +15,8 @@ const ( protocolIDPrefix = "/charon/dkg/bcast/1.0.0" protocolIDSig = protocolIDPrefix + "/sig" protocolIDMsg = protocolIDPrefix + "/msg" + receiveTimeout = time.Minute // Allow for peers to be out of sync, with some sending messages much earlier and having to wait. + sendTimeout = receiveTimeout + 2*time.Second // Allow for server to timeout first. ) // hashFunc is a function that hashes a any-wrapped protobuf message. diff --git a/dkg/bcast/server.go b/dkg/bcast/server.go index a7492f204..6cc9d0a87 100644 --- a/dkg/bcast/server.go +++ b/dkg/bcast/server.go @@ -30,12 +30,14 @@ func newServer(tcpNode host.Host, signFunc signFunc, verifyFunc verifyFunc) *ser func() proto.Message { return new(pb.BCastSigRequest) }, s.handleSigRequest, p2p.WithDelimitedProtocol(protocolIDSig), + p2p.WithReceiveTimeout(receiveTimeout), ) p2p.RegisterHandler("bcast", tcpNode, protocolIDMsg, func() proto.Message { return new(pb.BCastMessage) }, s.handleMessage, p2p.WithDelimitedProtocol(protocolIDMsg), + p2p.WithReceiveTimeout(receiveTimeout), ) return s diff --git a/dkg/dkg.go b/dkg/dkg.go index 4e3b07628..4c34ef79e 100644 --- a/dkg/dkg.go +++ b/dkg/dkg.go @@ -65,6 +65,7 @@ type TestConfig struct { StoreKeysFunc func(secrets []tbls.PrivateKey, dir string) error TCPNodeCallback func(host.Host) ShutdownCallback func() + SyncOpts []func(*sync.Client) } // HasTestConfig returns true if any of the test config fields are set. @@ -183,7 +184,11 @@ func Run(ctx context.Context, conf Config) (err error) { return errors.Wrap(err, "get peer IDs") } - ex := newExchanger(tcpNode, nodeIdx.PeerIdx, peerIds, def.NumValidators) + ex := newExchanger(tcpNode, nodeIdx.PeerIdx, peerIds, def.NumValidators, []sigType{ + sigLock, + sigDepositData, + sigValidatorRegistration, + }) // Register Frost libp2p handlers peerMap := make(map[peer.ID]cluster.NodeIdx) @@ -208,7 +213,7 @@ func Run(ctx context.Context, conf Config) (err error) { // Improve UX of "context cancelled" errors when sync fails. ctx = errors.WithCtxErr(ctx, "p2p connection failed, please retry DKG") - stopSync, err := startSyncProtocol(ctx, tcpNode, key, def.DefinitionHash, peerIds, cancel, conf.TestConfig.SyncCallback) + nextStepSync, stopSync, err := startSyncProtocol(ctx, tcpNode, key, def.DefinitionHash, peerIds, cancel, conf.TestConfig) if err != nil { return err } @@ -238,6 +243,11 @@ func Run(ctx context.Context, conf Config) (err error) { return errors.New("unsupported dkg algorithm") } + // DKG was step 1, advance to step 2 + if err := nextStepSync(ctx); err != nil { + return err + } + // Sign, exchange and aggregate Deposit Data depositDatas, err := signAndAggDepositData(ctx, ex, shares, def.WithdrawalAddresses(), network, nodeIdx) if err != nil { @@ -245,6 +255,10 @@ func Run(ctx context.Context, conf Config) (err error) { } log.Debug(ctx, "Aggregated deposit data signatures") + // Deposit data was step 2, advance to step 3 + if err := nextStepSync(ctx); err != nil { + return err + } // Sign, exchange and aggregate builder validator registration signatures. valRegs, err := signAndAggValidatorRegistrations( @@ -261,6 +275,10 @@ func Run(ctx context.Context, conf Config) (err error) { } log.Debug(ctx, "Aggregated builder validator registration signatures") + // Pre-regs was step 3, advance to step 4 + if err := nextStepSync(ctx); err != nil { + return err + } // Sign, exchange and aggregate Lock Hash signatures lock, err := signAndAggLockHash(ctx, shares, def, nodeIdx, ex, depositDatas, valRegs) @@ -268,6 +286,12 @@ func Run(ctx context.Context, conf Config) (err error) { return err } + log.Debug(ctx, "Aggregated lock hash signatures") + // Lock hash aggregate was step 4, advance to step 5 + if err := nextStepSync(ctx); err != nil { + return err + } + // Sign, exchange K1 signatures over Lock Hash lock.NodeSignatures, err = nodeSigCaster.exchange(ctx, key, lock.LockHash) if err != nil { @@ -278,16 +302,17 @@ func Run(ctx context.Context, conf Config) (err error) { lock.NodeSignatures = nil } + log.Debug(ctx, "Exchanged node signatures") + // Node signatures was step 5, advance to step 6 + if err := nextStepSync(ctx); err != nil { + return err + } + if !conf.NoVerify { if err := lock.VerifySignatures(); err != nil { return errors.Wrap(err, "invalid lock file") } } - log.Debug(ctx, "Aggregated lock hash signatures") - - if err = stopSync(ctx); err != nil { - return errors.Wrap(err, "sync shutdown") // Consider increasing --shutdown-delay if this occurs often. - } // Write keystores, deposit data and cluster lock files after exchange of partial signatures in order // to prevent partial data writes in case of peer connection lost @@ -304,8 +329,13 @@ func Run(ctx context.Context, conf Config) (err error) { log.Debug(ctx, "Saved keyshares to disk") } + // dashboardURL is the Launchpad dashboard url for a given lock file. + // If empty, either conf.Publish wasn't specified or there was a processing error in publishing + // the generated lock file. + var dashboardURL string + if conf.Publish { - if err = writeLockToAPI(ctx, conf.PublishAddr, lock); err != nil { + if dashboardURL, err = writeLockToAPI(ctx, conf.PublishAddr, lock); err != nil { log.Warn(ctx, "Couldn't publish lock file to Obol API", err) } } @@ -320,7 +350,15 @@ func Run(ctx context.Context, conf Config) (err error) { } log.Debug(ctx, "Saved deposit data file to disk") - // TODO(corver): Improve graceful shutdown, see https://github.com/ObolNetwork/charon/issues/887 + // Signature verification and disk key write was step 6, advance to step 7 + if err := nextStepSync(ctx); err != nil { + return err + } + + if err = stopSync(ctx); err != nil { + return errors.Wrap(err, "sync shutdown") // Consider increasing --shutdown-delay if this occurs often. + } + if conf.TestConfig.ShutdownCallback != nil { conf.TestConfig.ShutdownCallback() } @@ -329,6 +367,10 @@ func Run(ctx context.Context, conf Config) (err error) { log.Info(ctx, "Successfully completed DKG ceremony 🎉") + if dashboardURL != "" { + log.Info(ctx, fmt.Sprintf("You can find your newly-created cluster dashboard here: %s", dashboardURL)) + } + return nil } @@ -380,17 +422,17 @@ func setupP2P(ctx context.Context, key *k1.PrivateKey, conf Config, peers []p2p. }, nil } -// startSyncProtocol sets up a sync protocol server and clients for each peer and returns a shutdown function +// startSyncProtocol sets up a sync protocol server and clients for each peer and returns a step sync and shutdown functions // when all peers are connected. func startSyncProtocol(ctx context.Context, tcpNode host.Host, key *k1.PrivateKey, defHash []byte, - peerIDs []peer.ID, onFailure func(), testCallback func(connected int, id peer.ID), -) (func(context.Context) error, error) { + peerIDs []peer.ID, onFailure func(), testConfig TestConfig, +) (func(context.Context) error, func(context.Context) error, error) { // Sign definition hash with charon-enr-private-key // Note: libp2p signing does another hash of the defHash. hashSig, err := ((*libp2pcrypto.Secp256k1PrivateKey)(key)).Sign(defHash) if err != nil { - return nil, errors.Wrap(err, "sign definition hash") + return nil, nil, errors.Wrap(err, "sign definition hash") } // DKG compatibility is minor version dependent. @@ -406,7 +448,8 @@ func startSyncProtocol(ctx context.Context, tcpNode host.Host, key *k1.PrivateKe } ctx := log.WithCtx(ctx, z.Str("peer", p2p.PeerName(pID))) - client := sync.NewClient(tcpNode, pID, hashSig, minorVersion) + + client := sync.NewClient(tcpNode, pID, hashSig, minorVersion, testConfig.SyncOpts...) clients = append(clients, client) go func() { @@ -422,11 +465,11 @@ func startSyncProtocol(ctx context.Context, tcpNode host.Host, key *k1.PrivateKe for { // Return if there is a context error. if ctx.Err() != nil { - return nil, ctx.Err() + return nil, nil, ctx.Err() } if err := server.Err(); err != nil { - return nil, errors.Wrap(err, "sync server error") + return nil, nil, errors.Wrap(err, "sync server error") } var connectedCount int @@ -436,8 +479,8 @@ func startSyncProtocol(ctx context.Context, tcpNode host.Host, key *k1.PrivateKe } } - if testCallback != nil { - testCallback(connectedCount, tcpNode.ID()) + if testConfig.SyncCallback != nil { + testConfig.SyncCallback(connectedCount, tcpNode.ID()) } // Break if all clients are connected @@ -456,11 +499,33 @@ func startSyncProtocol(ctx context.Context, tcpNode host.Host, key *k1.PrivateKe err = server.AwaitAllConnected(ctx) if err != nil { - return nil, err + return nil, nil, err + } + + var step int + stepSyncFunc := func(ctx context.Context) error { + // Start next step ourselves by incrementing our step client side + step++ + for _, client := range clients { + client.SetStep(step) + } + + log.Debug(ctx, "Waiting for peers to start next step", z.Int("step", step)) + + if err := server.AwaitAllAtStep(ctx, step); err != nil { + return errors.Wrap(err, "sync step", z.Int("step", step)) + } + + return nil + } + + // All peer start on step 0, so advance to step 1. + if err := stepSyncFunc(ctx); err != nil { + return nil, nil, err } // Shutdown function stops all clients and server - return func(ctx context.Context) error { + shutdownFunc := func(ctx context.Context) error { for _, client := range clients { err := client.Shutdown(ctx) if err != nil { @@ -469,7 +534,9 @@ func startSyncProtocol(ctx context.Context, tcpNode host.Host, key *k1.PrivateKe } return server.AwaitAllShutdown(ctx) - }, nil + } + + return stepSyncFunc, shutdownFunc, nil } // signAndAggLockHash returns cluster lock file with aggregated signature after signing, exchange and aggregation of partial signatures. @@ -966,17 +1033,20 @@ func createDistValidators(shares []share, depositDatas []eth2p0.DepositData, val return dvs, nil } -// writeLockToAPI posts the lock file to obol-api. -func writeLockToAPI(ctx context.Context, publishAddr string, lock cluster.Lock) error { - cl := obolapi.New(publishAddr) +// writeLockToAPI posts the lock file to obol-api and returns the Launchpad dashboard URL. +func writeLockToAPI(ctx context.Context, publishAddr string, lock cluster.Lock) (string, error) { + cl, err := obolapi.New(publishAddr) + if err != nil { + return "", err + } if err := cl.PublishLock(ctx, lock); err != nil { - return err + return "", err } log.Debug(ctx, "Published lock file to api") - return nil + return cl.LaunchpadURLForLock(lock), nil } // validateKeymanagerFlags returns an error if one keymanager flag is present but the other is not. diff --git a/dkg/dkg_test.go b/dkg/dkg_test.go index f0a5d35eb..cde1d9641 100644 --- a/dkg/dkg_test.go +++ b/dkg/dkg_test.go @@ -127,6 +127,7 @@ func testDKG(t *testing.T, def cluster.Definition, dir string, p2pKeys []*k1.Pri return keystore.StoreKeysInsecure(secrets, dir, keystore.ConfirmInsecureKeys) }, ShutdownCallback: shutdownSync, + SyncOpts: []func(*dkgsync.Client){dkgsync.WithPeriod(time.Millisecond * 50)}, }, } diff --git a/dkg/dkgpb/v1/sync.pb.go b/dkg/dkgpb/v1/sync.pb.go index 8f0753a46..c32b01e02 100644 --- a/dkg/dkgpb/v1/sync.pb.go +++ b/dkg/dkgpb/v1/sync.pb.go @@ -30,6 +30,7 @@ type MsgSync struct { HashSignature []byte `protobuf:"bytes,2,opt,name=hash_signature,json=hashSignature,proto3" json:"hash_signature,omitempty"` Shutdown bool `protobuf:"varint,3,opt,name=shutdown,proto3" json:"shutdown,omitempty"` Version string `protobuf:"bytes,4,opt,name=version,proto3" json:"version,omitempty"` + Step int64 `protobuf:"varint,5,opt,name=step,proto3" json:"step,omitempty"` } func (x *MsgSync) Reset() { @@ -92,6 +93,13 @@ func (x *MsgSync) GetVersion() string { return "" } +func (x *MsgSync) GetStep() int64 { + if x != nil { + return x.Step + } + return 0 +} + type MsgSyncResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -154,7 +162,7 @@ var file_dkg_dkgpb_v1_sync_proto_rawDesc = []byte{ 0x79, 0x6e, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x64, 0x6b, 0x67, 0x2e, 0x64, 0x6b, 0x67, 0x70, 0x62, 0x2e, 0x76, 0x31, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa0, 0x01, 0x0a, 0x07, 0x4d, 0x73, 0x67, + 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb4, 0x01, 0x0a, 0x07, 0x4d, 0x73, 0x67, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, @@ -164,17 +172,19 @@ var file_dkg_dkgpb_v1_sync_proto_rawDesc = []byte{ 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x73, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x6a, 0x0a, 0x0f, 0x4d, - 0x73, 0x67, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, - 0x0a, 0x0e, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x52, 0x0d, 0x73, 0x79, 0x6e, 0x63, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x62, 0x6f, 0x6c, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, - 0x6b, 0x2f, 0x63, 0x68, 0x61, 0x72, 0x6f, 0x6e, 0x2f, 0x64, 0x6b, 0x67, 0x2f, 0x64, 0x6b, 0x67, - 0x70, 0x62, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x73, + 0x74, 0x65, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x73, 0x74, 0x65, 0x70, 0x22, + 0x6a, 0x0a, 0x0f, 0x4d, 0x73, 0x67, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x41, 0x0a, 0x0e, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0d, 0x73, 0x79, 0x6e, 0x63, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x2c, 0x5a, 0x2a, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x62, 0x6f, 0x6c, 0x6e, 0x65, + 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x63, 0x68, 0x61, 0x72, 0x6f, 0x6e, 0x2f, 0x64, 0x6b, 0x67, + 0x2f, 0x64, 0x6b, 0x67, 0x70, 0x62, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( diff --git a/dkg/dkgpb/v1/sync.proto b/dkg/dkgpb/v1/sync.proto index 4d44e0309..8cf517c81 100644 --- a/dkg/dkgpb/v1/sync.proto +++ b/dkg/dkgpb/v1/sync.proto @@ -11,6 +11,7 @@ message MsgSync { bytes hash_signature = 2; bool shutdown = 3; string version = 4; + int64 step = 5; } message MsgSyncResponse { diff --git a/dkg/exchanger.go b/dkg/exchanger.go index 331723cb7..0943c2387 100644 --- a/dkg/exchanger.go +++ b/dkg/exchanger.go @@ -4,10 +4,14 @@ package dkg import ( "context" + "sync" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/core/parsigdb" "github.com/obolnetwork/charon/core/parsigex" @@ -21,41 +25,90 @@ import ( type sigType int const ( - // dutyLock is responsible for lock hash signed partial signatures exchange and aggregation. + // sigLock is responsible for lock hash signed partial signatures exchange and aggregation. sigLock sigType = 101 - // dutyDepositData is responsible for deposit data signed partial signatures exchange and aggregation. + // sigDepositData is responsible for deposit data signed partial signatures exchange and aggregation. sigDepositData sigType = 102 // sigValidatorRegistration is responsible for the pre-generated validator registration exchange and aggregation. sigValidatorRegistration sigType = 103 ) -// sigData includes the fields obtained from sigdb when threshold is reached. -type sigData struct { - sigType sigType - pubkey core.PubKey - psigs []core.ParSignedData +// sigTypeStore is a shorthand for a map of sigType to map of core.PubKey to slice of core.ParSignedData. +type sigTypeStore map[sigType]map[core.PubKey][]core.ParSignedData + +// dataByPubkey maps a sigType to its map of public key to slice of core.ParSignedData.. +type dataByPubkey struct { + numVals int + store sigTypeStore + lock sync.Mutex +} + +// set sets data for the given sigType and core.PubKey. +func (stb *dataByPubkey) set(pubKey core.PubKey, sigType sigType, data []core.ParSignedData) { + stb.lock.Lock() + defer stb.lock.Unlock() + + _, ok := stb.store[sigType] + if !ok { + stb.store[sigType] = map[core.PubKey][]core.ParSignedData{} + } + + stb.store[sigType][pubKey] = data +} + +// get gets all the core.ParSignedData for a given core.PubKey. +func (stb *dataByPubkey) get(sigType sigType) (map[core.PubKey][]core.ParSignedData, bool) { + stb.lock.Lock() + defer stb.lock.Unlock() + + data, ok := stb.store[sigType] + if !ok { + return nil, ok + } + + if len(data) != stb.numVals { + return nil, false + } + + ret := make(map[core.PubKey][]core.ParSignedData) + + for k, v := range data { + ret[k] = v + } + + return ret, ok } // exchanger is responsible for exchanging partial signatures between peers on libp2p. type exchanger struct { - sigChan chan sigData - sigex *parsigex.ParSigEx - sigdb *parsigdb.MemDB - numVals int + sigex *parsigex.ParSigEx + sigdb *parsigdb.MemDB + sigTypes map[sigType]bool + sigData dataByPubkey } -func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int) *exchanger { +func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int, sigTypes []sigType) *exchanger { // Partial signature roots not known yet, so skip verification in parsigex, rather verify before we aggregate. noopVerifier := func(ctx context.Context, duty core.Duty, key core.PubKey, data core.ParSignedData) error { return nil } + st := make(map[sigType]bool) + + for _, sigType := range sigTypes { + st[sigType] = true + } + ex := &exchanger{ // threshold is len(peers) to wait until we get all the partial sigs from all the peers per DV - sigdb: parsigdb.NewMemDB(len(peers), noopDeadliner{}), - sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier), - sigChan: make(chan sigData, vals), // Allow buffering all signature sets - numVals: vals, + sigdb: parsigdb.NewMemDB(len(peers), noopDeadliner{}), + sigex: parsigex.NewParSigEx(tcpNode, p2p.Send, peerIdx, peers, noopVerifier), + sigTypes: st, + sigData: dataByPubkey{ + store: sigTypeStore{}, + numVals: vals, + lock: sync.Mutex{}, + }, } // Wiring core workflow components @@ -76,36 +129,32 @@ func (e *exchanger) exchange(ctx context.Context, sigType sigType, set core.ParS return nil, err } - sets := make(map[core.PubKey][]core.ParSignedData) + tick := time.NewTicker(50 * time.Millisecond) + defer tick.Stop() + for { select { + case <-tick.C: + // We are done when we have ParSignedData of all the DVs from all each peer + if data, ok := e.sigData.get(sigType); ok { + return data, nil + } case <-ctx.Done(): return nil, ctx.Err() - case peerSet := <-e.sigChan: - if sigType != peerSet.sigType { - // Do nothing if duty doesn't match - continue - } - sets[peerSet.pubkey] = peerSet.psigs - } - - // We are done when we have ParSignedData of all the DVs from all each peer - if len(sets) == e.numVals { - break } } - - return sets, nil } // pushPsigs is responsible for writing partial signature data to sigChan obtained from other peers. func (e *exchanger) pushPsigs(_ context.Context, duty core.Duty, pk core.PubKey, psigs []core.ParSignedData) error { - e.sigChan <- sigData{ - sigType: sigType(duty.Slot), - pubkey: pk, - psigs: psigs, + sigType := sigType(duty.Slot) + + if !e.sigTypes[sigType] { + return errors.New("unrecognized sigType", z.Int("sigType", int(sigType))) } + e.sigData.set(pk, sigType, psigs) + return nil } diff --git a/dkg/exchanger_internal_test.go b/dkg/exchanger_internal_test.go index c15ab68a7..0bff8647f 100644 --- a/dkg/exchanger_internal_test.go +++ b/dkg/exchanger_internal_test.go @@ -58,6 +58,12 @@ func TestExchanger(t *testing.T) { hosts []host.Host hostsInfo []peer.AddrInfo exchangers []*exchanger + + expectedSigTypes = []sigType{ + sigLock, + sigDepositData, + sigValidatorRegistration, + } ) // Create hosts @@ -83,12 +89,46 @@ func TestExchanger(t *testing.T) { } for i := 0; i < nodes; i++ { - ex := newExchanger(hosts[i], i, peers, dvs) + ex := newExchanger(hosts[i], i, peers, dvs, expectedSigTypes) exchangers = append(exchangers, ex) } - respChan := make(chan map[core.PubKey][]core.ParSignedData) + type respStruct struct { + data map[core.PubKey][]core.ParSignedData + sigType sigType + } + + respChan := make(chan respStruct) var wg sync.WaitGroup + + // send multiple (supported) messages at the same time, showing that exchanger can exchange messages of various + // sigTypes concurrently + for i := 0; i < nodes; i++ { + wg.Add(2) + go func(node int) { + defer wg.Done() + + data, err := exchangers[node].exchange(ctx, sigDepositData, dataToBeSent[node]) + require.NoError(t, err) + + respChan <- respStruct{ + data: data, + sigType: sigDepositData, + } + }(i) + go func(node int) { + defer wg.Done() + + data, err := exchangers[node].exchange(ctx, sigValidatorRegistration, dataToBeSent[node]) + require.NoError(t, err) + + respChan <- respStruct{ + data: data, + sigType: sigValidatorRegistration, + } + }(i) + } + for i := 0; i < nodes; i++ { wg.Add(1) go func(node int) { @@ -97,7 +137,10 @@ func TestExchanger(t *testing.T) { data, err := exchangers[node].exchange(ctx, sigLock, dataToBeSent[node]) require.NoError(t, err) - respChan <- data + respChan <- respStruct{ + data: data, + sigType: sigLock, + } }(i) } @@ -106,12 +149,22 @@ func TestExchanger(t *testing.T) { close(respChan) // Closes response channel once all the goroutines are done with writing. }() - var actual []map[core.PubKey][]core.ParSignedData + actual := make(sigTypeStore) for res := range respChan { - actual = append(actual, res) + actual[res.sigType] = res.data } - for i := 0; i < nodes; i++ { - reflect.DeepEqual(actual[i], expectedData) + // test that data we expected arrived, for each sigType + for _, data := range actual { + reflect.DeepEqual(data, expectedData) } + + // test that all sigTypes expected to arrive actually arrived + for _, expectedSigType := range expectedSigTypes { + _, ok := actual[expectedSigType] + require.True(t, ok, "missing sigType %d from received data", expectedSigType) + } + + // require that we encountered all the sigTypes expected + require.Len(t, actual, len(expectedSigTypes)) } diff --git a/dkg/sync/client.go b/dkg/sync/client.go index 27493fed9..d810a5dc9 100644 --- a/dkg/sync/client.go +++ b/dkg/sync/client.go @@ -21,9 +21,16 @@ import ( "github.com/obolnetwork/charon/p2p" ) +// WithPeriod sets the period between pings. +func WithPeriod(period time.Duration) func(*Client) { + return func(c *Client) { + c.period = period + } +} + // NewClient returns a new Client instance. -func NewClient(tcpNode host.Host, peer peer.ID, hashSig []byte, version version.SemVer) *Client { - return &Client{ +func NewClient(tcpNode host.Host, peer peer.ID, hashSig []byte, version version.SemVer, opts ...func(*Client)) *Client { + c := &Client{ tcpNode: tcpNode, peer: peer, hashSig: hashSig, @@ -31,7 +38,14 @@ func NewClient(tcpNode host.Host, peer peer.ID, hashSig []byte, version version. done: make(chan struct{}), reconnect: true, version: version, + period: 250 * time.Millisecond, + } + + for _, opt := range opts { + opt(c) } + + return c } // Client is the client side of the sync protocol. It retries establishing a connection to a sync server, @@ -42,6 +56,7 @@ type Client struct { mu sync.Mutex connected bool reconnect bool + step int shutdown chan struct{} done chan struct{} @@ -50,6 +65,7 @@ type Client struct { version version.SemVer tcpNode host.Host peer peer.ID + period time.Duration } // Run blocks while running the client-side sync protocol. It tries to reconnect if relay connection is dropped or @@ -87,6 +103,22 @@ func (c *Client) Run(ctx context.Context) error { } } +// SetStep sets the current step. +func (c *Client) SetStep(step int) { + c.mu.Lock() + defer c.mu.Unlock() + + c.step = step +} + +// getStep returns the current step. +func (c *Client) getStep() int { + c.mu.Lock() + defer c.mu.Unlock() + + return c.step +} + // IsConnected returns if client is connected to the server or not. func (c *Client) IsConnected() bool { c.mu.Lock() @@ -126,7 +158,7 @@ func (c *Client) clearConnected() { // sendMsgs sends period sync protocol messages on the stream until error or shutdown. func (c *Client) sendMsgs(ctx context.Context, stream network.Stream) (relayBroke bool, connBroke bool, err error) { - timer := time.NewTicker(time.Second) + timer := time.NewTicker(c.period) defer timer.Stop() first := make(chan struct{}, 1) @@ -167,6 +199,7 @@ func (c *Client) sendMsg(stream network.Stream, shutdown bool) (*pb.MsgSyncRespo HashSignature: c.hashSig, Shutdown: shutdown, Version: c.version.String(), + Step: int64(c.getStep()), } if err := writeSizedProto(stream, msg); err != nil { diff --git a/dkg/sync/server.go b/dkg/sync/server.go index e0115d9b8..7496ade84 100644 --- a/dkg/sync/server.go +++ b/dkg/sync/server.go @@ -42,6 +42,7 @@ func NewServer(tcpNode host.Host, allCount int, defHash []byte, version version. allCount: allCount, shutdown: make(map[peer.ID]struct{}), connected: make(map[peer.ID]struct{}), + steps: make(map[peer.ID]int), version: version, } } @@ -59,6 +60,7 @@ type Server struct { mu sync.Mutex shutdown map[peer.ID]struct{} connected map[peer.ID]struct{} + steps map[peer.ID]int err error // To return error and exit anywhere in the server flow } @@ -117,6 +119,29 @@ func (s *Server) AwaitAllShutdown(ctx context.Context) error { } } +// AwaitAllAtStep blocks until all peers have reported to be at the given step or returns an error. +func (s *Server) AwaitAllAtStep(ctx context.Context, step int) error { + timer := time.NewTicker(time.Millisecond) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + if err := s.Err(); err != nil { + return err + } + + if ok, err := s.isAllAtStep(step); err != nil { + return err + } else if ok { + return nil + } + } + } +} + // isConnected returns the shared connected state for the peer. func (s *Server) isConnected(pID peer.ID) bool { s.mu.Lock() @@ -145,6 +170,14 @@ func (s *Server) setShutdown(pID peer.ID) { s.shutdown[pID] = struct{}{} } +// setStep sets the peer's reported step. +func (s *Server) setStep(pID peer.ID, step int) { + s.mu.Lock() + defer s.mu.Unlock() + + s.steps[pID] = step +} + // isAllConnected returns if all expected peers are connected. func (s *Server) isAllConnected() bool { s.mu.Lock() @@ -161,6 +194,30 @@ func (s *Server) isAllShutdown() bool { return len(s.shutdown) == s.allCount } +// isAllAtStep returns if all peers are reporting to be at the given or the next step. +// Allowing next step is required since atomic step increases are impossible in distributed systems +// so one peer will always increment first putting it ahead of the others. At least we know all peers +// are or were at the given step. +func (s *Server) isAllAtStep(step int) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.steps) != s.allCount { + return false, nil + } + + for _, actual := range s.steps { + if actual >= step+2 { + return false, errors.New("peer step is too far ahead", z.Int("peer_step", actual), z.Int("local_step", step)) + } + if actual != step && actual != step+1 { + return false, nil + } + } + + return true, nil +} + // clearConnected clears connected state for the given peer. func (s *Server) clearConnected(pID peer.ID) { s.mu.Lock() @@ -205,6 +262,8 @@ func (s *Server) handleStream(ctx context.Context, stream network.Stream) error log.Info(ctx, fmt.Sprintf("Connected to peer %d of %d", count, s.allCount)) } + s.setStep(pID, int(msg.Step)) + // Write response message if err := writeSizedProto(stream, resp); err != nil { return err diff --git a/dkg/sync/sync_test.go b/dkg/sync/sync_test.go index 55b879e7f..96f6d8625 100644 --- a/dkg/sync/sync_test.go +++ b/dkg/sync/sync_test.go @@ -97,7 +97,7 @@ func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr hashSig, err := keys[i].Sign(hash) require.NoError(t, err) - client := sync.NewClient(tcpNodes[i], tcpNodes[j].ID(), hashSig, versions[i]) + client := sync.NewClient(tcpNodes[i], tcpNodes[j].ID(), hashSig, versions[i], sync.WithPeriod(time.Millisecond*100)) clients = append(clients, client) ctx := log.WithTopic(ctx, fmt.Sprintf("client%d_%d", i, j)) @@ -132,6 +132,14 @@ func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr return } + for i := 0; i < 5; i++ { + assertAllAtStep(ctx, t, servers, i) + + for _, client := range clients { + client.SetStep(i + 1) + } + } + t.Log("client.IsConnected") for _, client := range clients { require.True(t, client.IsConnected()) @@ -150,6 +158,19 @@ func testCluster(t *testing.T, n int, versions map[int]version.SemVer, expectErr } } +func assertAllAtStep(ctx context.Context, t *testing.T, servers []*sync.Server, step int) { + t.Helper() + for _, server := range servers { + err := server.AwaitAllAtStep(ctx, step) + require.NoError(t, err) + + checkCtx, cancel := context.WithTimeout(ctx, time.Millisecond*10) + err = server.AwaitAllAtStep(checkCtx, step+1) + require.ErrorIs(t, err, context.DeadlineExceeded) + cancel() + } +} + func newTCPNode(t *testing.T, seed int64) (host.Host, libp2pcrypto.PrivKey) { t.Helper() diff --git a/p2p/receive.go b/p2p/receive.go index 0966658cf..9ef0a1b2d 100644 --- a/p2p/receive.go +++ b/p2p/receive.go @@ -53,9 +53,8 @@ func RegisterHandler(logTopic string, tcpNode host.Host, pID protocol.ID, t0 := time.Now() name := PeerName(s.Conn().RemotePeer()) - timeout := time.Second * 5 - _ = s.SetReadDeadline(time.Now().Add(timeout)) - ctx, cancel := context.WithTimeout(context.Background(), timeout) + _ = s.SetReadDeadline(time.Now().Add(o.receiveTimeout)) + ctx, cancel := context.WithTimeout(context.Background(), o.receiveTimeout) ctx = log.WithTopic(ctx, logTopic) ctx = log.WithCtx(ctx, z.Str("peer", name), diff --git a/p2p/sender.go b/p2p/sender.go index e4d1dc83b..9d80f6e38 100644 --- a/p2p/sender.go +++ b/p2p/sender.go @@ -21,9 +21,11 @@ import ( ) const ( - senderHysteresis = 3 - senderBuffer = senderHysteresis + 1 - maxMsgSize = 128 << 20 // 128MB + senderHysteresis = 3 + senderBuffer = senderHysteresis + 1 + maxMsgSize = 128 << 20 // 128MB + defaultRcvTimeout = time.Second * 5 + defaultSendTimeout = defaultRcvTimeout + 2*time.Second // Allow for up to 1s hop latency (2s RTT) ) // SendFunc is an abstract function responsible for sending libp2p messages. @@ -147,6 +149,22 @@ type sendRecvOpts struct { writersByProtocol map[protocol.ID]func(network.Stream) pbio.Writer readersByProtocol map[protocol.ID]func(network.Stream) pbio.Reader rttCallback func(time.Duration) + receiveTimeout time.Duration + sendTimeout time.Duration +} + +// WithReceiveTimeout returns an option for SendReceive that sets a timeout for handling incoming messages. +func WithReceiveTimeout(timeout time.Duration) func(*sendRecvOpts) { + return func(opts *sendRecvOpts) { + opts.receiveTimeout = timeout + } +} + +// WithSendTimeout returns an option for SendReceive that sets a timeout for sending messages. +func WithSendTimeout(timeout time.Duration) func(*sendRecvOpts) { + return func(opts *sendRecvOpts) { + opts.sendTimeout = timeout + } } // WithSendReceiveRTT returns an option for SendReceive that sets a callback for the RTT. @@ -175,7 +193,9 @@ func defaultSendRecvOpts(pID protocol.ID) sendRecvOpts { readersByProtocol: map[protocol.ID]func(s network.Stream) pbio.Reader{ pID: func(s network.Stream) pbio.Reader { return legacyReadWriter{s} }, }, - rttCallback: func(time.Duration) {}, + rttCallback: func(time.Duration) {}, + receiveTimeout: defaultRcvTimeout, + sendTimeout: defaultSendTimeout, } } @@ -200,6 +220,9 @@ func SendReceive(ctx context.Context, tcpNode host.Host, peerID peer.ID, if err != nil { return errors.Wrap(err, "new stream", z.Any("protocols", o.protocols)) } + if err := s.SetDeadline(time.Now().Add(o.sendTimeout)); err != nil { + return errors.Wrap(err, "set deadline") + } writeFunc, ok := o.writersByProtocol[s.Protocol()] if !ok { @@ -257,6 +280,9 @@ func Send(ctx context.Context, tcpNode host.Host, protoID protocol.ID, peerID pe if err != nil { return errors.Wrap(err, "tcpNode stream") } + if err := s.SetDeadline(time.Now().Add(o.sendTimeout)); err != nil { + return errors.Wrap(err, "set deadline") + } writeFunc, ok := o.writersByProtocol[s.Protocol()] if !ok { diff --git a/p2p/sender_test.go b/p2p/sender_test.go index 1c9b93c56..6c6c16ed5 100644 --- a/p2p/sender_test.go +++ b/p2p/sender_test.go @@ -5,6 +5,7 @@ package p2p_test import ( "context" "testing" + "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" @@ -19,6 +20,42 @@ import ( "github.com/obolnetwork/charon/testutil" ) +func TestWithReceiveTimeout(t *testing.T) { + server := testutil.CreateHost(t, testutil.AvailableAddr(t)) + client := testutil.CreateHost(t, testutil.AvailableAddr(t)) + + client.Peerstore().AddAddrs(server.ID(), server.Addrs(), time.Hour) + + protocolID := protocol.ID("testprotocol") + p2p.RegisterHandler("test", server, protocolID, func() proto.Message { return new(pbv1.Duty) }, + func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) { + require.Error(t, ctx.Err()) // Assert the context has been closed already since 0 timeout. + return nil, false, nil + }, p2p.WithReceiveTimeout(0)) + + err := p2p.SendReceive(context.Background(), client, server.ID(), new(pbv1.Duty), new(pbv1.Duty), protocolID) + require.Error(t, err) + require.ErrorContains(t, err, "no or zero response received") +} + +func TestWithSendTimeout(t *testing.T) { + server := testutil.CreateHost(t, testutil.AvailableAddr(t)) + client := testutil.CreateHost(t, testutil.AvailableAddr(t)) + + client.Peerstore().AddAddrs(server.ID(), server.Addrs(), time.Hour) + + protocolID := protocol.ID("testprotocol") + p2p.RegisterHandler("test", server, protocolID, func() proto.Message { return new(pbv1.Duty) }, + func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) { + return nil, false, nil + }) + + err := p2p.SendReceive(context.Background(), client, server.ID(), + new(pbv1.Duty), new(pbv1.Duty), protocolID, p2p.WithSendTimeout(0)) + require.Error(t, err) + require.ErrorContains(t, err, "deadline") +} + func TestSend(t *testing.T) { var ( undelimID = protocol.ID("undelimited") diff --git a/testutil/compose/compose/main.go b/testutil/compose/compose/main.go index 0446b64ec..2d6e20b6e 100644 --- a/testutil/compose/compose/main.go +++ b/testutil/compose/compose/main.go @@ -138,7 +138,8 @@ func newNewCmd() *cobra.Command { nodes := cmd.Flags().Int("nodes", conf.NumNodes, "Number of charon nodes in the cluster.") insecureKeys := cmd.Flags().Bool("insecure-keys", conf.InsecureKeys, "To generate keys quickly.") slotDuration := cmd.Flags().Duration("simnet-slot-duration", time.Second, "Configures slot duration in simnet beacon mock.") - fuzz := cmd.Flags().Bool("fuzz", false, "Configures simnet beaconmock to return fuzzed responses.") + beaconFuzz := cmd.Flags().Bool("beacon-fuzz", false, "Configures simnet beaconmock to return fuzzed responses.") + p2pFuzz := cmd.Flags().Bool("p2p-fuzz", false, "Configures charon p2p network to return fuzzed responses of one of the nodes in the cluster.") cmd.RunE = func(cmd *cobra.Command, _ []string) error { conf.KeyGen = compose.KeyGen(*keygen) @@ -152,7 +153,8 @@ func newNewCmd() *cobra.Command { conf.Threshold = cluster.Threshold(conf.NumNodes) conf.InsecureKeys = *insecureKeys conf.SlotDuration = *slotDuration - conf.Fuzz = *fuzz + conf.BeaconFuzz = *beaconFuzz + conf.P2PFuzz = *p2pFuzz if conf.BuildLocal { conf.ImageTag = "local" diff --git a/testutil/compose/config.go b/testutil/compose/config.go index e3551cc1c..3cbf5e459 100644 --- a/testutil/compose/config.go +++ b/testutil/compose/config.go @@ -18,6 +18,7 @@ const ( charonImage = "obolnetwork/charon" containerBinary = "/usr/local/bin/charon" cmdRun = "run" + cmdUnsafeRun = "[unsafe,run]" cmdDKG = "dkg" cmdCreateCluster = "[create,cluster]" cmdCreateDKG = "[create,dkg]" @@ -109,8 +110,11 @@ type Config struct { // SlotDuration configures slot duration on simnet beacon mock for all the nodes in the cluster. SlotDuration time.Duration `json:"slot_duration"` - // Fuzz configures simnet beaconmock to return fuzzed responses. - Fuzz bool `json:"fuzz"` + // BeaconFuzz configures simnet beaconmock to return fuzzed responses. + BeaconFuzz bool `json:"beacon-fuzz"` + + // P2PFuzz configures charon p2p network to send and receive fuzzed messages. + P2PFuzz bool `json:"p2p-fuzz"` // SyntheticBlockProposals configures use of synthetic block proposals in simnet cluster. SyntheticBlockProposals bool `json:"synthetic_block_proposals"` diff --git a/testutil/compose/fuzz/beacon_fuzz_test.go b/testutil/compose/fuzz/beacon_fuzz_test.go deleted file mode 100644 index f6f44552d..000000000 --- a/testutil/compose/fuzz/beacon_fuzz_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 - -package fuzz_test - -import ( - "context" - "flag" - "fmt" - "os" - "path" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/obolnetwork/charon/testutil" - "github.com/obolnetwork/charon/testutil/compose" -) - -//go:generate go test . -run=TestBeaconFuzz -integration -v - -var ( - integration = flag.Bool("integration", false, "Enable docker based integration test") - sudoPerms = flag.Bool("sudo-perms", false, "Enables changing all compose artefacts file permissions using sudo.") - logDir = flag.String("log-dir", "", "Specifies the directory to store test docker-compose logs. Empty defaults to stdout.") - fuzzTimeout = flag.Duration("fuzz-timeout", time.Minute*10, "Specifies the duration of the beacon fuzz test.") -) - -func TestBeaconFuzz(t *testing.T) { - if !*integration { - t.Skip("Skipping beacon fuzz integration test") - } - - dir, err := os.MkdirTemp("", "") - require.NoError(t, err) - - conf := compose.NewDefaultConfig() - conf.SyntheticBlockProposals = true - conf.Fuzz = true - conf.DisableMonitoringPorts = true - conf.BuildLocal = true - conf.ImageTag = "local" - conf.InsecureKeys = true - require.NoError(t, compose.WriteConfig(dir, conf)) - - os.Args = []string{"cobra.test"} - - autoConfig := compose.AutoConfig{ - Dir: dir, - AlertTimeout: *fuzzTimeout, - SudoPerms: *sudoPerms, - } - - if *logDir != "" { - autoConfig.LogFile = path.Join(*logDir, fmt.Sprintf("%s.log", t.Name())) - } - - err = compose.Auto(context.Background(), autoConfig) - testutil.RequireNoError(t, err) -} diff --git a/testutil/compose/fuzz/fuzz_test.go b/testutil/compose/fuzz/fuzz_test.go new file mode 100644 index 000000000..7d1ce49cf --- /dev/null +++ b/testutil/compose/fuzz/fuzz_test.go @@ -0,0 +1,86 @@ +// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package fuzz_test + +import ( + "context" + "flag" + "fmt" + "os" + "path" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/obolnetwork/charon/testutil" + "github.com/obolnetwork/charon/testutil/compose" +) + +var ( + fuzzer = flag.Bool("fuzzer", false, "Enables docker based fuzz tests") + sudoPerms = flag.Bool("sudo-perms", false, "Enables changing all compose artefacts file permissions using sudo.") + logDir = flag.String("log-dir", "", "Specifies the directory to store test docker-compose logs. Empty defaults to stdout.") +) + +func TestFuzzers(t *testing.T) { + if !*fuzzer { + t.Skip("Skipping fuzz tests") + } + + defaultConfig := compose.NewDefaultConfig() + defaultConfig.DisableMonitoringPorts = true + defaultConfig.BuildLocal = true + defaultConfig.ImageTag = "local" + defaultConfig.InsecureKeys = true + + tests := []struct { + name string + configFunc func(compose.Config) compose.Config + timeout time.Duration + }{ + { + name: "beacon_fuzz_tests", + configFunc: func(config compose.Config) compose.Config { + config.BeaconFuzz = true + + return config + }, + timeout: time.Minute * 20, + }, + { + name: "p2p_fuzz_tests", + configFunc: func(config compose.Config) compose.Config { + config.P2PFuzz = true + + return config + }, + timeout: time.Minute * 20, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + + config := test.configFunc(defaultConfig) + require.NoError(t, compose.WriteConfig(dir, config)) + + os.Args = []string{"cobra.test"} + + autoConfig := compose.AutoConfig{ + Dir: dir, + AlertTimeout: test.timeout, + SudoPerms: *sudoPerms, + } + + if *logDir != "" { + autoConfig.LogFile = path.Join(*logDir, fmt.Sprintf("%s.log", test.name)) + } + + err = compose.Auto(context.Background(), autoConfig) + testutil.RequireNoError(t, err) + }) + } +} diff --git a/testutil/compose/lock.go b/testutil/compose/lock.go index daf1749cb..b11db3421 100644 --- a/testutil/compose/lock.go +++ b/testutil/compose/lock.go @@ -138,7 +138,7 @@ func newNodeEnvs(index int, conf Config, vcType VCType) []kv { kv{"simnet-validator-mock", fmt.Sprintf(`"%v"`, vcType == VCMock)}, kv{"simnet-slot-duration", conf.SlotDuration.String()}, kv{"simnet-validator-keys-dir", fmt.Sprintf("/compose/node%d/validator_keys", index)}, - kv{"simnet-beacon-mock-fuzz", fmt.Sprintf(`"%v"`, conf.Fuzz)}, + kv{"simnet-beacon-mock-fuzz", fmt.Sprintf(`"%v"`, conf.BeaconFuzz)}, kv{"loki-addresses", "http://loki:3100/loki/api/v1/push"}, kv{"loki-service", fmt.Sprintf("node%d", index)}, kv{"synthetic-block-proposals", fmt.Sprintf(`"%v"`, conf.SyntheticBlockProposals)}, diff --git a/testutil/compose/run.go b/testutil/compose/run.go index e89e1f124..6f1faca62 100644 --- a/testutil/compose/run.go +++ b/testutil/compose/run.go @@ -41,10 +41,16 @@ func Run(ctx context.Context, dir string, conf Config) (TmplData, error) { nodes = append(nodes, n) } + charonCmd := cmdRun + if conf.P2PFuzz { + nodes[0].EnvVars = append(nodes[0].EnvVars, kv{"p2p-fuzz", fmt.Sprintf(`"%v"`, conf.P2PFuzz)}) + charonCmd = cmdUnsafeRun + } + data := TmplData{ ComposeDir: dir, CharonImageTag: conf.ImageTag, - CharonCommand: cmdRun, + CharonCommand: charonCmd, Nodes: nodes, Relay: true, Monitoring: conf.Monitoring, diff --git a/testutil/compose/testdata/TestNewDefaultConfig.golden b/testutil/compose/testdata/TestNewDefaultConfig.golden index 918601a79..d6f9494c7 100644 --- a/testutil/compose/testdata/TestNewDefaultConfig.golden +++ b/testutil/compose/testdata/TestNewDefaultConfig.golden @@ -19,7 +19,8 @@ "disable_monitoring_ports": false, "insecure_keys": false, "slot_duration": 1000000000, - "fuzz": false, + "beacon-fuzz": false, + "p2p-fuzz": false, "synthetic_block_proposals": true, "monitoring": true, "builder_api": false diff --git a/testutil/promrated/Dockerfile b/testutil/promrated/Dockerfile index 3bf74a8e1..c0cbfa4e2 100644 --- a/testutil/promrated/Dockerfile +++ b/testutil/promrated/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20.6-alpine AS builder +FROM golang:1.20.7-alpine AS builder # Install dependencies RUN apk add --no-cache build-base git