Skip to content

Commit

Permalink
fix: possible race condition with small batches upon batch submission (
Browse files Browse the repository at this point in the history
  • Loading branch information
omritoptix authored Jul 19, 2023
1 parent 93642b5 commit 9feaced
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 11 deletions.
4 changes: 2 additions & 2 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
// for aggregator: get notification that batch has been accepted so can send next batch.
func (m *Manager) SyncTargetLoop(ctx context.Context) {
m.logger.Info("Started sync target loop")
subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted)
subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewBatchAccepted)
if err != nil {
m.logger.Error("failed to subscribe to state update events")
panic(err)
Expand All @@ -368,7 +368,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) {
return
case event := <-subscription.Out():
m.logger.Info("Received state update event", "eventData", event.Data())
eventData := event.Data().(*settlement.EventDataNewSettlementBatchAccepted)
eventData := event.Data().(*settlement.EventDataNewBatchAccepted)
m.updateSyncParams(ctx, eventData.EndHeight)
// In case we are the aggregator and we've got an update, then we can stop blocking from
// the next batches to be published. For non-aggregators this is not needed.
Expand Down
2 changes: 1 addition & 1 deletion da/celestia/celestia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestSubmitBatch(t *testing.T) {
err = dalc.Stop()
require.NoError(err)
// Wait for the goroutines to finish before accessing the mock calls
time.Sleep(1 * time.Second)
time.Sleep(3 * time.Second)
t.Log("Verifying mock calls")
assert.GreaterOrEqual(testutil.CountMockCalls(mockCNCClient.Calls, submitPFDFuncName), tc.expectedSubmitPFDMinCalls)
}
Expand Down
7 changes: 5 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestHealthStatusEventHandler(t *testing.T) {
err = node.Start()
assert.NoError(err)
// wait for node to start
time.Sleep(100 * time.Millisecond)
time.Sleep(1 * time.Second)

slUnealthyError := errors.New("settlement layer is unhealthy")
daUnealthyError := errors.New("da layer is unhealthy")
Expand Down Expand Up @@ -171,8 +171,10 @@ func TestHealthStatusEventHandler(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
done := make(chan bool, 1)
ready := make(chan bool, 1)
go func() {
HealthSubscription, err := node.pubsubServer.Subscribe(node.ctx, c.name, events.EventQueryHealthStatus)
ready <- true
assert.NoError(err)
select {
case event := <-HealthSubscription.Out():
Expand All @@ -184,14 +186,15 @@ func TestHealthStatusEventHandler(t *testing.T) {
assert.Equal(c.expectedError, healthStatusEvent.Error)
done <- true
break
case <-time.After(500 * time.Millisecond):
case <-time.After(1 * time.Second):
if c.expectHealthStatusEventEmitted {
t.Error("expected health status event but didn't get one")
}
done <- true
break
}
}()
<-ready
// Emit an event.
node.pubsubServer.PublishWithEvents(context.Background(), c.baseLayerHealthStatusEventData, c.baseLayerHealthStatusEvent)
<-done
Expand Down
11 changes: 10 additions & 1 deletion settlement/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/log"
"github.com/dymensionxyz/dymint/types"
"github.com/dymensionxyz/dymint/utils"
"github.com/tendermint/tendermint/libs/pubsub"
)

Expand Down Expand Up @@ -148,7 +149,7 @@ func (b *BaseLayerClient) fetchSequencersList() ([]*types.Sequencer, error) {

func (b *BaseLayerClient) validateBatch(batch *types.Batch) error {
if batch.StartHeight != atomic.LoadUint64(&b.latestHeight)+1 {
return fmt.Errorf("batch start height must be last height. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)+1)
return fmt.Errorf("batch start height != latest height + 1. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight))
}
if batch.EndHeight < batch.StartHeight {
return fmt.Errorf("batch end height must be greater than start height. EndHeight %d, StartHeight %d", batch.EndHeight, batch.StartHeight)
Expand All @@ -170,10 +171,18 @@ func (b *BaseLayerClient) stateUpdatesHandler(ready chan bool) {
b.logger.Debug("received state update event", "eventData", event.Data())
eventData := event.Data().(*EventDataNewSettlementBatchAccepted)
atomic.StoreUint64(&b.latestHeight, eventData.EndHeight)
// Emit new batch event
newBatchEventData := &EventDataNewBatchAccepted{
EndHeight: eventData.EndHeight,
StateIndex: eventData.StateIndex,
}
utils.SubmitEventOrPanic(b.ctx, b.pubsub, newBatchEventData,
map[string][]string{EventTypeKey: {EventNewBatchAccepted}})
case <-subscription.Cancelled():
b.logger.Info("subscription canceled")
return
case <-b.ctx.Done():
b.logger.Info("Context done. Exiting state update handler")
return
}
}
Expand Down
17 changes: 12 additions & 5 deletions settlement/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@ const (

// Define the event types
const (
EventNewSettlementBatchAccepted = "NewSettlementBatchAccepted"
EventSequencersListUpdated = "SequencersListUpdated"
EventSettlementHealthStatus = "SettlementHealthStatus"
// This event should be emitted internally in order to communicate between the settlement layer and the hub client
EventNewSettlementBatchAccepted = "EventNewSettlementBatchAccepted"
// This event should be emitted externally when a batch is accepted
EventNewBatchAccepted = "EventNewBatchAccepted"
EventSequencersListUpdated = "SequencersListUpdated"
EventSettlementHealthStatus = "SettlementHealthStatus"
)

// EventDataNewSettlementBatchAccepted defines the structure of the event data for the EventNewSettlementBatchAccepted
type EventDataNewSettlementBatchAccepted struct {
// EventDataNewBatchAccepted defines the structure of the event data for the EventNewBatchAccepted
type EventDataNewBatchAccepted struct {
// EndHeight is the height of the last accepted batch
EndHeight uint64
// StateIndex is the rollapp-specific index the batch was saved in the SL
StateIndex uint64
}

// EventDataNewSettlementBatchAccepted defines the structure of the event data for the EventNewSettlementBatchAccepted
type EventDataNewSettlementBatchAccepted EventDataNewBatchAccepted

// EventDataSequencersListUpdated defines the structure of the event data for the EventSequencersListUpdated
type EventDataSequencersListUpdated struct {
// Sequencers is the list of sequencers
Expand All @@ -46,6 +52,7 @@ type EventDataSettlementHealthStatus struct {
// Define queries
var (
EventQueryNewSettlementBatchAccepted = QueryForEvent(EventNewSettlementBatchAccepted)
EventQueryNewBatchAccepted = QueryForEvent(EventNewBatchAccepted)
EventQuerySettlementHealthStatus = QueryForEvent(EventSettlementHealthStatus)
)

Expand Down

0 comments on commit 9feaced

Please sign in to comment.