From 6798ab1047ca2fa7bed4e3d78bc8162a412e58ed Mon Sep 17 00:00:00 2001 From: omritoptix Date: Tue, 18 Jul 2023 19:36:29 +0200 Subject: [PATCH] Fixed a bug where a possible race condition could occur between settlement latestHeight and manager batch submission. --- block/manager.go | 4 ++-- da/celestia/celestia_test.go | 2 +- node/node_test.go | 4 ++-- settlement/base.go | 12 +++++++++++- settlement/events.go | 17 ++++++++++++----- 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/block/manager.go b/block/manager.go index 0d3eba315..649a1d432 100644 --- a/block/manager.go +++ b/block/manager.go @@ -350,7 +350,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) { // for aggregator: get notification that batch has been accepted so can send next batch. func (m *Manager) SyncTargetLoop(ctx context.Context) { m.logger.Info("Started sync target loop") - subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted) + subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewBatchAccepted) if err != nil { m.logger.Error("failed to subscribe to state update events") panic(err) @@ -368,7 +368,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) { return case event := <-subscription.Out(): m.logger.Info("Received state update event", "eventData", event.Data()) - eventData := event.Data().(*settlement.EventDataNewSettlementBatchAccepted) + eventData := event.Data().(*settlement.EventDataNewBatchAccepted) m.updateSyncParams(ctx, eventData.EndHeight) // In case we are the aggregator and we've got an update, then we can stop blocking from // the next batches to be published. For non-aggregators this is not needed. diff --git a/da/celestia/celestia_test.go b/da/celestia/celestia_test.go index 849272abc..6cdd4e70c 100644 --- a/da/celestia/celestia_test.go +++ b/da/celestia/celestia_test.go @@ -146,7 +146,7 @@ func TestSubmitBatch(t *testing.T) { err = dalc.Stop() require.NoError(err) // Wait for the goroutines to finish before accessing the mock calls - time.Sleep(1 * time.Second) + time.Sleep(3 * time.Second) t.Log("Verifying mock calls") assert.GreaterOrEqual(testutil.CountMockCalls(mockCNCClient.Calls, submitPFDFuncName), tc.expectedSubmitPFDMinCalls) } diff --git a/node/node_test.go b/node/node_test.go index acae0dd75..1259ba052 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -117,7 +117,7 @@ func TestHealthStatusEventHandler(t *testing.T) { err = node.Start() assert.NoError(err) // wait for node to start - time.Sleep(100 * time.Millisecond) + time.Sleep(1 * time.Second) slUnealthyError := errors.New("settlement layer is unhealthy") daUnealthyError := errors.New("da layer is unhealthy") @@ -184,7 +184,7 @@ func TestHealthStatusEventHandler(t *testing.T) { assert.Equal(c.expectedError, healthStatusEvent.Error) done <- true break - case <-time.After(500 * time.Millisecond): + case <-time.After(5 * time.Second): if c.expectHealthStatusEventEmitted { t.Error("expected health status event but didn't get one") } diff --git a/settlement/base.go b/settlement/base.go index ae6b3a832..f2e4219f4 100644 --- a/settlement/base.go +++ b/settlement/base.go @@ -8,6 +8,7 @@ import ( "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/log" "github.com/dymensionxyz/dymint/types" + "github.com/dymensionxyz/dymint/utils" "github.com/tendermint/tendermint/libs/pubsub" ) @@ -148,7 +149,7 @@ func (b *BaseLayerClient) fetchSequencersList() ([]*types.Sequencer, error) { func (b *BaseLayerClient) validateBatch(batch *types.Batch) error { if batch.StartHeight != atomic.LoadUint64(&b.latestHeight)+1 { - return fmt.Errorf("batch start height must be last height. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)+1) + return fmt.Errorf("batch start height != latest height + 1. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)) } if batch.EndHeight < batch.StartHeight { return fmt.Errorf("batch end height must be greater than start height. EndHeight %d, StartHeight %d", batch.EndHeight, batch.StartHeight) @@ -164,16 +165,25 @@ func (b *BaseLayerClient) stateUpdatesHandler(ready chan bool) { panic(err) } ready <- true + b.logger.Info("state updates handler ready") for { select { case event := <-subscription.Out(): b.logger.Debug("received state update event", "eventData", event.Data()) eventData := event.Data().(*EventDataNewSettlementBatchAccepted) atomic.StoreUint64(&b.latestHeight, eventData.EndHeight) + // Emit new batch event + newBatchEventData := &EventDataNewBatchAccepted{ + EndHeight: eventData.EndHeight, + StateIndex: eventData.StateIndex, + } + utils.SubmitEventOrPanic(b.ctx, b.pubsub, newBatchEventData, + map[string][]string{EventTypeKey: {EventNewBatchAccepted}}) case <-subscription.Cancelled(): b.logger.Info("subscription canceled") return case <-b.ctx.Done(): + b.logger.Info("Context done. Exiting state update handler") return } } diff --git a/settlement/events.go b/settlement/events.go index 36de7123e..1993a671b 100644 --- a/settlement/events.go +++ b/settlement/events.go @@ -16,19 +16,25 @@ const ( // Define the event types const ( - EventNewSettlementBatchAccepted = "NewSettlementBatchAccepted" - EventSequencersListUpdated = "SequencersListUpdated" - EventSettlementHealthStatus = "SettlementHealthStatus" + // This event should be emitted internally in order to communicate between the settlement layer and the hub client + EventNewSettlementBatchAccepted = "EventNewSettlementBatchAccepted" + // This event should be emitted externally when a batch is accepted + EventNewBatchAccepted = "EventNewBatchAccepted" + EventSequencersListUpdated = "SequencersListUpdated" + EventSettlementHealthStatus = "SettlementHealthStatus" ) -// EventDataNewSettlementBatchAccepted defines the structure of the event data for the EventNewSettlementBatchAccepted -type EventDataNewSettlementBatchAccepted struct { +// EventDataNewBatchAccepted defines the structure of the event data for the EventNewBatchAccepted +type EventDataNewBatchAccepted struct { // EndHeight is the height of the last accepted batch EndHeight uint64 // StateIndex is the rollapp-specific index the batch was saved in the SL StateIndex uint64 } +// EventDataNewSettlementBatchAccepted defines the structure of the event data for the EventNewSettlementBatchAccepted +type EventDataNewSettlementBatchAccepted EventDataNewBatchAccepted + // EventDataSequencersListUpdated defines the structure of the event data for the EventSequencersListUpdated type EventDataSequencersListUpdated struct { // Sequencers is the list of sequencers @@ -46,6 +52,7 @@ type EventDataSettlementHealthStatus struct { // Define queries var ( EventQueryNewSettlementBatchAccepted = QueryForEvent(EventNewSettlementBatchAccepted) + EventQueryNewBatchAccepted = QueryForEvent(EventNewBatchAccepted) EventQuerySettlementHealthStatus = QueryForEvent(EventSettlementHealthStatus) )