Skip to content

Commit

Permalink
fix: restart DS client (#53)
Browse files Browse the repository at this point in the history
* feat: restart DS client
* feat: reset callback function
* feat: enable and disable callback function
* feat: remove cb nil function
* feat: check DS is running
* Add github workflow for resequence
* Always upload logs
* feat: update lib
* feat: halt
* feat: update lib release
* feat: resetCurrentBatchData
* feat: disable long running test

---------

Co-authored-by: Jerry <[email protected]>
  • Loading branch information
ToniRamirezM and cffls committed Sep 2, 2024
1 parent eaafcc1 commit 6113321
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 23 deletions.
100 changes: 100 additions & 0 deletions .github/workflows/test-resequence.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
name: Resequence test
on:
push:
branches:
# Disable test for the moment as it takes too long
- "this-test-is-disabled"


concurrency:
group: ${{ github.ref }}
cancel-in-progress: true

jobs:
Resequence:
runs-on: ubuntu-latest
# TODO: Add "cdk-validium" once it's ready
# strategy:
# matrix:
# da-mode: [ "rollup" ]
steps:
- name: Checkout cdk
uses: actions/checkout@v4
with:
path: cdk

- name: Checkout kurtosis-cdk
uses: actions/checkout@v4
with:
repository: 0xPolygonHermez/cdk-erigon
ref: banana
path: cdk-erigon

- name: Checkout kurtosis-cdk
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
ref: 3debe0a4dd000e02f7e6bde3247432211bf0336f
path: kurtosis-cdk

- name: Install Kurtosis CDK tools
uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk

- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1

- name: Install yq
run: |
sudo curl -L https://github.com/mikefarah/yq/releases/download/v4.44.2/yq_linux_amd64 -o /usr/local/bin/yq
sudo chmod +x /usr/local/bin/yq
/usr/local/bin/yq --version
- name: Install polycli
run: |
tmp_dir=$(mktemp -d) && curl -L https://github.com/0xPolygon/polygon-cli/releases/download/v0.1.48/polycli_v0.1.48_linux_amd64.tar.gz | tar -xz -C "$tmp_dir" && mv "$tmp_dir"/* /usr/local/bin/polycli && rm -rf "$tmp_dir"
sudo chmod +x /usr/local/bin/polycli
/usr/local/bin/polycli version
- name: Build docker image
working-directory: ./cdk
run: docker build -t cdk:local --file Dockerfile .

- name: Remove unused flags
working-directory: ./kurtosis-cdk
run: |
sed -i '/zkevm.sequencer-batch-seal-time:/d' templates/cdk-erigon/config.yml
sed -i '/zkevm.sequencer-non-empty-batch-seal-time:/d' templates/cdk-erigon/config.yml
- name: Configure Kurtosis CDK
working-directory: ./kurtosis-cdk
run: |
/usr/local/bin/yq -i '.args.cdk_erigon_node_image = "jerrycgh/cdk-erigon:d5d04906f723f3f1d8c43c9e6baf3e18c27ff348"' params.yml
/usr/local/bin/yq -i '.args.cdk_node_image = "cdk:local"' params.yml
- name: Deploy Kurtosis CDK package
working-directory: ./kurtosis-cdk
run: kurtosis run --enclave cdk-v1 --args-file params.yml --image-download always .

- name: Test resequence
working-directory: ./cdk-erigon
run: .github/scripts/test_resequence.sh

- name: Prepare logs
if: always()
working-directory: ./kurtosis-cdk
run: |
mkdir -p ci_logs
cd ci_logs
kurtosis service logs cdk-v1 cdk-erigon-node-001 --all > cdk-erigon-node-001.log
kurtosis service logs cdk-v1 cdk-erigon-sequencer-001 --all > cdk-erigon-sequencer-001.log
kurtosis service logs cdk-v1 zkevm-agglayer-001 --all > zkevm-agglayer-001.log
kurtosis service logs cdk-v1 zkevm-prover-001 --all > zkevm-prover-001.log
kurtosis service logs cdk-v1 cdk-node-001 --all > cdk-node-001.log
kurtosis service logs cdk-v1 zkevm-bridge-service-001 --all > zkevm-bridge-service-001.log
- name: Upload logs
if: always()
uses: actions/upload-artifact@v3
with:
name: logs_${{ github.run_id }}
path: ./kurtosis-cdk/ci_logs
77 changes: 58 additions & 19 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type Aggregator struct {
l1Syncr synchronizer.Synchronizer
halted atomic.Bool

streamClientMutex *sync.Mutex

profitabilityChecker aggregatorTxProfitabilityChecker
timeSendFinalProof time.Time
timeCleanupLockedProofs types.Duration
Expand Down Expand Up @@ -178,6 +180,7 @@ func New(
etherman: etherman,
ethTxManager: ethTxManager,
streamClient: streamClient,
streamClientMutex: &sync.Mutex{},
l1Syncr: l1Syncr,
profitabilityChecker: profitabilityChecker,
stateDBMutex: &sync.Mutex{},
Expand All @@ -204,6 +207,14 @@ func New(
return a, nil
}

func (a *Aggregator) resetCurrentBatchData() {
a.currentBatchStreamData = []byte{}
a.currentStreamBatchRaw = state.BatchRawV2{
Blocks: make([]state.L2BlockRaw, 0),
}
a.currentStreamL2Block = state.L2BlockRaw{}
}

func (a *Aggregator) retrieveWitness() {
var success bool
for {
Expand Down Expand Up @@ -253,20 +264,32 @@ func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) {
// Halt the aggregator
a.halted.Store(true)
for {
log.Warnf("Halting the aggregator due to a L1 reorg. Reorged data has been deleted so it is safe to manually restart the aggregator.")
log.Errorf("Halting the aggregator due to a L1 reorg. Reorged data has been deleted so it is safe to manually restart the aggregator.")
time.Sleep(10 * time.Second) // nolint:gomnd
}
}

func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBatchesData) {
log.Warnf("Rollback batches event, rollbackBatchesData: %+v", rollbackData)

// Stop Reading the data stream
err := a.streamClient.ExecCommandStop()
if err != nil {
log.Errorf("failed to stop data stream: %v", err)
} else {
log.Info("Data stream client stopped")
a.streamClientMutex.Lock()
defer a.streamClientMutex.Unlock()

dsClientWasRunning := a.streamClient.IsStarted()

var err error

if dsClientWasRunning {
// Disable the process entry function to avoid processing the data stream
a.streamClient.ResetProcessEntryFunc()

// Stop Reading the data stream
err = a.streamClient.ExecCommandStop()
if err != nil {
log.Errorf("failed to stop data stream: %v.", err)
} else {
log.Info("Data stream client stopped")
}
}

// Get new last verified batch number from L1
Expand Down Expand Up @@ -324,8 +347,13 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat
}

if err == nil {
// Reset current batch data previously read from the data stream
a.resetCurrentBatchData()
a.currentStreamBatch = state.Batch{}
log.Info("Current batch data reset")

var marshalledBookMark []byte
// Resume reading the data stream
// Reset the data stream reading point
bookMark := &datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: rollbackData.LastBatchNumber + 1,
Expand All @@ -335,20 +363,32 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat
if err != nil {
log.Error("failed to marshal bookmark: %v", err)
} else {
err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark)
if err != nil {
log.Errorf("failed to connect to data stream: %v", err)
// Restart the stream client if needed
if dsClientWasRunning {
a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream)
err = a.streamClient.Start()
if err != nil {
log.Errorf("failed to start stream client, error: %v", err)
} else {
// Resume data stream reading
err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark)
if err != nil {
log.Errorf("failed to connect to data stream: %v", err)
}
log.Info("Data stream client resumed")
}
}
log.Info("Data stream client resumed")
}
}

if err == nil {
log.Info("Handling rollback batches event finished successfully")
} else {
// Halt the aggregator
a.halted.Store(true)
for {
log.Errorf("Error handling rollback batches event: %v", err)
time.Sleep(a.cfg.RetryTime.Duration)
log.Errorf("Halting the aggregator due to an error handling rollback batches event: %v", err)
time.Sleep(10 * time.Second) // nolint:gomnd
}
}
}
Expand Down Expand Up @@ -521,11 +561,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli
}

// Reset current batch data
a.currentBatchStreamData = []byte{}
a.currentStreamBatchRaw = state.BatchRawV2{
Blocks: make([]state.L2BlockRaw, 0),
}
a.currentStreamL2Block = state.L2BlockRaw{}
a.resetCurrentBatchData()

case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK):
// Add previous block (if any) to the current batch
Expand Down Expand Up @@ -653,6 +689,9 @@ func (a *Aggregator) Start() error {
}

// Start stream client
a.streamClientMutex.Lock()
defer a.streamClientMutex.Unlock()

err = a.streamClient.Start()
if err != nil {
log.Fatalf("failed to start stream client, error: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/0xPolygon/cdk-contracts-tooling v0.0.0-20240819092536-5a65d4761b2f
github.com/0xPolygon/cdk-data-availability v0.0.9
github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6
github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.7.0
github.com/ethereum/go-ethereum v1.14.5
Expand All @@ -26,6 +26,7 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.24.0
golang.org/x/net v0.26.0
golang.org/x/sync v0.7.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.2
)
Expand Down Expand Up @@ -140,7 +141,6 @@ require (
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/0xPolygon/cdk-data-availability v0.0.9 h1:KkP+hJH9nY5fljpSNtW2pfP5YQC
github.com/0xPolygon/cdk-data-availability v0.0.9/go.mod h1:5A+CU4FGeyG8zTDJc0khMzRxPzFzmpRydeEWmLztgh4=
github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d h1:sxh6hZ2jF/sxxj2jd5o1vuNNCZjYmn4aRG9SRlVaEFs=
github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d/go.mod h1:2scWqMMufrQXu7TikDgQ3BsyaKoX8qP26D6E262vSOg=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3 h1:zJ06KCGLMDOap4slop/QmiMUO+VPsKSS3+944SY06ww=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3/go.mod h1:bv7DjATsczN2WvFt26jv34TWv6rfvYM1SqegrgrFwfI=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6 h1:BSO1uu6dmLQ5kKb3uyDvsUxbnIoyumKvlwr0OtpTYMo=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6/go.mod h1:RC6ouyNsUtJrv5aGPcM6Dm5xhXN209tRSzcsJsaOtZI=
github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234 h1:QElCysO7f2xaknY/RDjxcs7IVmcgORfsCX2g+YD0Ko4=
github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234/go.mod h1:zBZWxwOHKlw+ghd9roQLgIkDZWA7e7qO3EsfQQT/+oQ=
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.7.0 h1:h/B5AzWSZTxb1HouulXeE9nbHD1d4/nc67ZQc0khAQA=
Expand Down

0 comments on commit 6113321

Please sign in to comment.