From 9feaced004a63c98d10a1cd89571a364da413f5f Mon Sep 17 00:00:00 2001 From: Omri Date: Wed, 19 Jul 2023 16:49:37 +0200 Subject: [PATCH] fix: possible race condition with small batches upon batch submission (#410) --- block/manager.go | 4 ++-- da/celestia/celestia_test.go | 2 +- node/node_test.go | 7 +++++-- settlement/base.go | 11 ++++++++++- settlement/events.go | 17 ++++++++++++----- 5 files changed, 30 insertions(+), 11 deletions(-) diff --git a/block/manager.go b/block/manager.go index 0d3eba315..649a1d432 100644 --- a/block/manager.go +++ b/block/manager.go @@ -350,7 +350,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) { // for aggregator: get notification that batch has been accepted so can send next batch. func (m *Manager) SyncTargetLoop(ctx context.Context) { m.logger.Info("Started sync target loop") - subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted) + subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewBatchAccepted) if err != nil { m.logger.Error("failed to subscribe to state update events") panic(err) @@ -368,7 +368,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) { return case event := <-subscription.Out(): m.logger.Info("Received state update event", "eventData", event.Data()) - eventData := event.Data().(*settlement.EventDataNewSettlementBatchAccepted) + eventData := event.Data().(*settlement.EventDataNewBatchAccepted) m.updateSyncParams(ctx, eventData.EndHeight) // In case we are the aggregator and we've got an update, then we can stop blocking from // the next batches to be published. For non-aggregators this is not needed. diff --git a/da/celestia/celestia_test.go b/da/celestia/celestia_test.go index 849272abc..6cdd4e70c 100644 --- a/da/celestia/celestia_test.go +++ b/da/celestia/celestia_test.go @@ -146,7 +146,7 @@ func TestSubmitBatch(t *testing.T) { err = dalc.Stop() require.NoError(err) // Wait for the goroutines to finish before accessing the mock calls - time.Sleep(1 * time.Second) + time.Sleep(3 * time.Second) t.Log("Verifying mock calls") assert.GreaterOrEqual(testutil.CountMockCalls(mockCNCClient.Calls, submitPFDFuncName), tc.expectedSubmitPFDMinCalls) } diff --git a/node/node_test.go b/node/node_test.go index acae0dd75..df42c3946 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -117,7 +117,7 @@ func TestHealthStatusEventHandler(t *testing.T) { err = node.Start() assert.NoError(err) // wait for node to start - time.Sleep(100 * time.Millisecond) + time.Sleep(1 * time.Second) slUnealthyError := errors.New("settlement layer is unhealthy") daUnealthyError := errors.New("da layer is unhealthy") @@ -171,8 +171,10 @@ func TestHealthStatusEventHandler(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { done := make(chan bool, 1) + ready := make(chan bool, 1) go func() { HealthSubscription, err := node.pubsubServer.Subscribe(node.ctx, c.name, events.EventQueryHealthStatus) + ready <- true assert.NoError(err) select { case event := <-HealthSubscription.Out(): @@ -184,7 +186,7 @@ func TestHealthStatusEventHandler(t *testing.T) { assert.Equal(c.expectedError, healthStatusEvent.Error) done <- true break - case <-time.After(500 * time.Millisecond): + case <-time.After(1 * time.Second): if c.expectHealthStatusEventEmitted { t.Error("expected health status event but didn't get one") } @@ -192,6 +194,7 @@ func TestHealthStatusEventHandler(t *testing.T) { break } }() + <-ready // Emit an event. node.pubsubServer.PublishWithEvents(context.Background(), c.baseLayerHealthStatusEventData, c.baseLayerHealthStatusEvent) <-done diff --git a/settlement/base.go b/settlement/base.go index ae6b3a832..98354e472 100644 --- a/settlement/base.go +++ b/settlement/base.go @@ -8,6 +8,7 @@ import ( "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/log" "github.com/dymensionxyz/dymint/types" + "github.com/dymensionxyz/dymint/utils" "github.com/tendermint/tendermint/libs/pubsub" ) @@ -148,7 +149,7 @@ func (b *BaseLayerClient) fetchSequencersList() ([]*types.Sequencer, error) { func (b *BaseLayerClient) validateBatch(batch *types.Batch) error { if batch.StartHeight != atomic.LoadUint64(&b.latestHeight)+1 { - return fmt.Errorf("batch start height must be last height. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)+1) + return fmt.Errorf("batch start height != latest height + 1. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)) } if batch.EndHeight < batch.StartHeight { return fmt.Errorf("batch end height must be greater than start height. EndHeight %d, StartHeight %d", batch.EndHeight, batch.StartHeight) @@ -170,10 +171,18 @@ func (b *BaseLayerClient) stateUpdatesHandler(ready chan bool) { b.logger.Debug("received state update event", "eventData", event.Data()) eventData := event.Data().(*EventDataNewSettlementBatchAccepted) atomic.StoreUint64(&b.latestHeight, eventData.EndHeight) + // Emit new batch event + newBatchEventData := &EventDataNewBatchAccepted{ + EndHeight: eventData.EndHeight, + StateIndex: eventData.StateIndex, + } + utils.SubmitEventOrPanic(b.ctx, b.pubsub, newBatchEventData, + map[string][]string{EventTypeKey: {EventNewBatchAccepted}}) case <-subscription.Cancelled(): b.logger.Info("subscription canceled") return case <-b.ctx.Done(): + b.logger.Info("Context done. Exiting state update handler") return } } diff --git a/settlement/events.go b/settlement/events.go index 36de7123e..1993a671b 100644 --- a/settlement/events.go +++ b/settlement/events.go @@ -16,19 +16,25 @@ const ( // Define the event types const ( - EventNewSettlementBatchAccepted = "NewSettlementBatchAccepted" - EventSequencersListUpdated = "SequencersListUpdated" - EventSettlementHealthStatus = "SettlementHealthStatus" + // This event should be emitted internally in order to communicate between the settlement layer and the hub client + EventNewSettlementBatchAccepted = "EventNewSettlementBatchAccepted" + // This event should be emitted externally when a batch is accepted + EventNewBatchAccepted = "EventNewBatchAccepted" + EventSequencersListUpdated = "SequencersListUpdated" + EventSettlementHealthStatus = "SettlementHealthStatus" ) -// EventDataNewSettlementBatchAccepted defines the structure of the event data for the EventNewSettlementBatchAccepted -type EventDataNewSettlementBatchAccepted struct { +// EventDataNewBatchAccepted defines the structure of the event data for the EventNewBatchAccepted +type EventDataNewBatchAccepted struct { // EndHeight is the height of the last accepted batch EndHeight uint64 // StateIndex is the rollapp-specific index the batch was saved in the SL StateIndex uint64 } +// EventDataNewSettlementBatchAccepted defines the structure of the event data for the EventNewSettlementBatchAccepted +type EventDataNewSettlementBatchAccepted EventDataNewBatchAccepted + // EventDataSequencersListUpdated defines the structure of the event data for the EventSequencersListUpdated type EventDataSequencersListUpdated struct { // Sequencers is the list of sequencers @@ -46,6 +52,7 @@ type EventDataSettlementHealthStatus struct { // Define queries var ( EventQueryNewSettlementBatchAccepted = QueryForEvent(EventNewSettlementBatchAccepted) + EventQueryNewBatchAccepted = QueryForEvent(EventNewBatchAccepted) EventQuerySettlementHealthStatus = QueryForEvent(EventSettlementHealthStatus) )