From 6113321669f1e0abbd7ef12aa1e797cbb5ace676 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Mon, 2 Sep 2024 11:32:12 +0200 Subject: [PATCH] fix: restart DS client (#53) * feat: restart DS client * feat: reset callback function * feat: enable and disable callback function * feat: remove cb nil function * feat: check DS is running * Add github workflow for resequence * Always upload logs * feat: update lib * feat: halt * feat: update lib release * feat: resetCurrentBatchData * feat: disable long running test --------- Co-authored-by: Jerry --- .github/workflows/test-resequence.yml | 100 ++++++++++++++++++++++++++ aggregator/aggregator.go | 77 +++++++++++++++----- go.mod | 4 +- go.sum | 4 +- 4 files changed, 162 insertions(+), 23 deletions(-) create mode 100644 .github/workflows/test-resequence.yml diff --git a/.github/workflows/test-resequence.yml b/.github/workflows/test-resequence.yml new file mode 100644 index 00000000..3e34cd63 --- /dev/null +++ b/.github/workflows/test-resequence.yml @@ -0,0 +1,100 @@ +name: Resequence test +on: + push: + branches: + # Disable test for the moment as it takes too long + - "this-test-is-disabled" + + +concurrency: + group: ${{ github.ref }} + cancel-in-progress: true + +jobs: + Resequence: + runs-on: ubuntu-latest + # TODO: Add "cdk-validium" once it's ready + # strategy: + # matrix: + # da-mode: [ "rollup" ] + steps: + - name: Checkout cdk + uses: actions/checkout@v4 + with: + path: cdk + + - name: Checkout kurtosis-cdk + uses: actions/checkout@v4 + with: + repository: 0xPolygonHermez/cdk-erigon + ref: banana + path: cdk-erigon + + - name: Checkout kurtosis-cdk + uses: actions/checkout@v4 + with: + repository: 0xPolygon/kurtosis-cdk + ref: 3debe0a4dd000e02f7e6bde3247432211bf0336f + path: kurtosis-cdk + + - name: Install Kurtosis CDK tools + uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk + + - name: Install Foundry + uses: foundry-rs/foundry-toolchain@v1 + + - name: Install yq + run: | + sudo curl -L https://github.com/mikefarah/yq/releases/download/v4.44.2/yq_linux_amd64 -o /usr/local/bin/yq + sudo chmod +x /usr/local/bin/yq + /usr/local/bin/yq --version + + - name: Install polycli + run: | + tmp_dir=$(mktemp -d) && curl -L https://github.com/0xPolygon/polygon-cli/releases/download/v0.1.48/polycli_v0.1.48_linux_amd64.tar.gz | tar -xz -C "$tmp_dir" && mv "$tmp_dir"/* /usr/local/bin/polycli && rm -rf "$tmp_dir" + sudo chmod +x /usr/local/bin/polycli + /usr/local/bin/polycli version + + - name: Build docker image + working-directory: ./cdk + run: docker build -t cdk:local --file Dockerfile . + + - name: Remove unused flags + working-directory: ./kurtosis-cdk + run: | + sed -i '/zkevm.sequencer-batch-seal-time:/d' templates/cdk-erigon/config.yml + sed -i '/zkevm.sequencer-non-empty-batch-seal-time:/d' templates/cdk-erigon/config.yml + + - name: Configure Kurtosis CDK + working-directory: ./kurtosis-cdk + run: | + /usr/local/bin/yq -i '.args.cdk_erigon_node_image = "jerrycgh/cdk-erigon:d5d04906f723f3f1d8c43c9e6baf3e18c27ff348"' params.yml + /usr/local/bin/yq -i '.args.cdk_node_image = "cdk:local"' params.yml + + - name: Deploy Kurtosis CDK package + working-directory: ./kurtosis-cdk + run: kurtosis run --enclave cdk-v1 --args-file params.yml --image-download always . + + - name: Test resequence + working-directory: ./cdk-erigon + run: .github/scripts/test_resequence.sh + + - name: Prepare logs + if: always() + working-directory: ./kurtosis-cdk + run: | + mkdir -p ci_logs + cd ci_logs + kurtosis service logs cdk-v1 cdk-erigon-node-001 --all > cdk-erigon-node-001.log + kurtosis service logs cdk-v1 cdk-erigon-sequencer-001 --all > cdk-erigon-sequencer-001.log + kurtosis service logs cdk-v1 zkevm-agglayer-001 --all > zkevm-agglayer-001.log + kurtosis service logs cdk-v1 zkevm-prover-001 --all > zkevm-prover-001.log + kurtosis service logs cdk-v1 cdk-node-001 --all > cdk-node-001.log + kurtosis service logs cdk-v1 zkevm-bridge-service-001 --all > zkevm-bridge-service-001.log + + - name: Upload logs + if: always() + uses: actions/upload-artifact@v3 + with: + name: logs_${{ github.run_id }} + path: ./kurtosis-cdk/ci_logs diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index cefe1a56..2b90826e 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -69,6 +69,8 @@ type Aggregator struct { l1Syncr synchronizer.Synchronizer halted atomic.Bool + streamClientMutex *sync.Mutex + profitabilityChecker aggregatorTxProfitabilityChecker timeSendFinalProof time.Time timeCleanupLockedProofs types.Duration @@ -178,6 +180,7 @@ func New( etherman: etherman, ethTxManager: ethTxManager, streamClient: streamClient, + streamClientMutex: &sync.Mutex{}, l1Syncr: l1Syncr, profitabilityChecker: profitabilityChecker, stateDBMutex: &sync.Mutex{}, @@ -204,6 +207,14 @@ func New( return a, nil } +func (a *Aggregator) resetCurrentBatchData() { + a.currentBatchStreamData = []byte{} + a.currentStreamBatchRaw = state.BatchRawV2{ + Blocks: make([]state.L2BlockRaw, 0), + } + a.currentStreamL2Block = state.L2BlockRaw{} +} + func (a *Aggregator) retrieveWitness() { var success bool for { @@ -253,7 +264,7 @@ func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) { // Halt the aggregator a.halted.Store(true) for { - log.Warnf("Halting the aggregator due to a L1 reorg. Reorged data has been deleted so it is safe to manually restart the aggregator.") + log.Errorf("Halting the aggregator due to a L1 reorg. Reorged data has been deleted so it is safe to manually restart the aggregator.") time.Sleep(10 * time.Second) // nolint:gomnd } } @@ -261,12 +272,24 @@ func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) { func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBatchesData) { log.Warnf("Rollback batches event, rollbackBatchesData: %+v", rollbackData) - // Stop Reading the data stream - err := a.streamClient.ExecCommandStop() - if err != nil { - log.Errorf("failed to stop data stream: %v", err) - } else { - log.Info("Data stream client stopped") + a.streamClientMutex.Lock() + defer a.streamClientMutex.Unlock() + + dsClientWasRunning := a.streamClient.IsStarted() + + var err error + + if dsClientWasRunning { + // Disable the process entry function to avoid processing the data stream + a.streamClient.ResetProcessEntryFunc() + + // Stop Reading the data stream + err = a.streamClient.ExecCommandStop() + if err != nil { + log.Errorf("failed to stop data stream: %v.", err) + } else { + log.Info("Data stream client stopped") + } } // Get new last verified batch number from L1 @@ -324,8 +347,13 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat } if err == nil { + // Reset current batch data previously read from the data stream + a.resetCurrentBatchData() + a.currentStreamBatch = state.Batch{} + log.Info("Current batch data reset") + var marshalledBookMark []byte - // Resume reading the data stream + // Reset the data stream reading point bookMark := &datastream.BookMark{ Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH, Value: rollbackData.LastBatchNumber + 1, @@ -335,20 +363,32 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat if err != nil { log.Error("failed to marshal bookmark: %v", err) } else { - err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark) - if err != nil { - log.Errorf("failed to connect to data stream: %v", err) + // Restart the stream client if needed + if dsClientWasRunning { + a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream) + err = a.streamClient.Start() + if err != nil { + log.Errorf("failed to start stream client, error: %v", err) + } else { + // Resume data stream reading + err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark) + if err != nil { + log.Errorf("failed to connect to data stream: %v", err) + } + log.Info("Data stream client resumed") + } } - log.Info("Data stream client resumed") } } if err == nil { log.Info("Handling rollback batches event finished successfully") } else { + // Halt the aggregator + a.halted.Store(true) for { - log.Errorf("Error handling rollback batches event: %v", err) - time.Sleep(a.cfg.RetryTime.Duration) + log.Errorf("Halting the aggregator due to an error handling rollback batches event: %v", err) + time.Sleep(10 * time.Second) // nolint:gomnd } } } @@ -521,11 +561,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli } // Reset current batch data - a.currentBatchStreamData = []byte{} - a.currentStreamBatchRaw = state.BatchRawV2{ - Blocks: make([]state.L2BlockRaw, 0), - } - a.currentStreamL2Block = state.L2BlockRaw{} + a.resetCurrentBatchData() case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK): // Add previous block (if any) to the current batch @@ -653,6 +689,9 @@ func (a *Aggregator) Start() error { } // Start stream client + a.streamClientMutex.Lock() + defer a.streamClientMutex.Unlock() + err = a.streamClient.Start() if err != nil { log.Fatalf("failed to start stream client, error: %v", err) diff --git a/go.mod b/go.mod index eb37abe8..4edb8870 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/0xPolygon/cdk-contracts-tooling v0.0.0-20240819092536-5a65d4761b2f github.com/0xPolygon/cdk-data-availability v0.0.9 github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d - github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3 + github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6 github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234 github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.7.0 github.com/ethereum/go-ethereum v1.14.5 @@ -26,6 +26,7 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/crypto v0.24.0 golang.org/x/net v0.26.0 + golang.org/x/sync v0.7.0 google.golang.org/grpc v1.64.0 google.golang.org/protobuf v1.34.2 ) @@ -140,7 +141,6 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/go.sum b/go.sum index cb93abd1..c6e1fea5 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ github.com/0xPolygon/cdk-data-availability v0.0.9 h1:KkP+hJH9nY5fljpSNtW2pfP5YQC github.com/0xPolygon/cdk-data-availability v0.0.9/go.mod h1:5A+CU4FGeyG8zTDJc0khMzRxPzFzmpRydeEWmLztgh4= github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d h1:sxh6hZ2jF/sxxj2jd5o1vuNNCZjYmn4aRG9SRlVaEFs= github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d/go.mod h1:2scWqMMufrQXu7TikDgQ3BsyaKoX8qP26D6E262vSOg= -github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3 h1:zJ06KCGLMDOap4slop/QmiMUO+VPsKSS3+944SY06ww= -github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3/go.mod h1:bv7DjATsczN2WvFt26jv34TWv6rfvYM1SqegrgrFwfI= +github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6 h1:BSO1uu6dmLQ5kKb3uyDvsUxbnIoyumKvlwr0OtpTYMo= +github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6/go.mod h1:RC6ouyNsUtJrv5aGPcM6Dm5xhXN209tRSzcsJsaOtZI= github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234 h1:QElCysO7f2xaknY/RDjxcs7IVmcgORfsCX2g+YD0Ko4= github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234/go.mod h1:zBZWxwOHKlw+ghd9roQLgIkDZWA7e7qO3EsfQQT/+oQ= github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.7.0 h1:h/B5AzWSZTxb1HouulXeE9nbHD1d4/nc67ZQc0khAQA=