Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: possible race condition with small batches upon batch submission #410

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
// for aggregator: get notification that batch has been accepted so can send next batch.
func (m *Manager) SyncTargetLoop(ctx context.Context) {
m.logger.Info("Started sync target loop")
subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted)
subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewBatchAccepted)
if err != nil {
m.logger.Error("failed to subscribe to state update events")
panic(err)
Expand All @@ -368,7 +368,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) {
return
case event := <-subscription.Out():
m.logger.Info("Received state update event", "eventData", event.Data())
eventData := event.Data().(*settlement.EventDataNewSettlementBatchAccepted)
eventData := event.Data().(*settlement.EventDataNewBatchAccepted)
m.updateSyncParams(ctx, eventData.EndHeight)
// In case we are the aggregator and we've got an update, then we can stop blocking from
// the next batches to be published. For non-aggregators this is not needed.
Expand Down
2 changes: 1 addition & 1 deletion da/celestia/celestia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestSubmitBatch(t *testing.T) {
err = dalc.Stop()
require.NoError(err)
// Wait for the goroutines to finish before accessing the mock calls
time.Sleep(1 * time.Second)
time.Sleep(3 * time.Second)
t.Log("Verifying mock calls")
assert.GreaterOrEqual(testutil.CountMockCalls(mockCNCClient.Calls, submitPFDFuncName), tc.expectedSubmitPFDMinCalls)
}
Expand Down
4 changes: 2 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestHealthStatusEventHandler(t *testing.T) {
err = node.Start()
assert.NoError(err)
// wait for node to start
time.Sleep(100 * time.Millisecond)
time.Sleep(1 * time.Second)

slUnealthyError := errors.New("settlement layer is unhealthy")
daUnealthyError := errors.New("da layer is unhealthy")
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestHealthStatusEventHandler(t *testing.T) {
assert.Equal(c.expectedError, healthStatusEvent.Error)
done <- true
break
case <-time.After(500 * time.Millisecond):
case <-time.After(5 * time.Second):
if c.expectHealthStatusEventEmitted {
t.Error("expected health status event but didn't get one")
}
Expand Down
12 changes: 11 additions & 1 deletion settlement/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/log"
"github.com/dymensionxyz/dymint/types"
"github.com/dymensionxyz/dymint/utils"
"github.com/tendermint/tendermint/libs/pubsub"
)

Expand Down Expand Up @@ -148,7 +149,7 @@ func (b *BaseLayerClient) fetchSequencersList() ([]*types.Sequencer, error) {

func (b *BaseLayerClient) validateBatch(batch *types.Batch) error {
if batch.StartHeight != atomic.LoadUint64(&b.latestHeight)+1 {
return fmt.Errorf("batch start height must be last height. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)+1)
return fmt.Errorf("batch start height != latest height + 1. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight))
}
if batch.EndHeight < batch.StartHeight {
return fmt.Errorf("batch end height must be greater than start height. EndHeight %d, StartHeight %d", batch.EndHeight, batch.StartHeight)
Expand All @@ -164,16 +165,25 @@ func (b *BaseLayerClient) stateUpdatesHandler(ready chan bool) {
panic(err)
}
ready <- true
b.logger.Info("state updates handler ready")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be probably debug
or better informative

for {
select {
case event := <-subscription.Out():
b.logger.Debug("received state update event", "eventData", event.Data())
eventData := event.Data().(*EventDataNewSettlementBatchAccepted)
atomic.StoreUint64(&b.latestHeight, eventData.EndHeight)
// Emit new batch event
newBatchEventData := &EventDataNewBatchAccepted{
EndHeight: eventData.EndHeight,
StateIndex: eventData.StateIndex,
}
utils.SubmitEventOrPanic(b.ctx, b.pubsub, newBatchEventData,
map[string][]string{EventTypeKey: {EventNewBatchAccepted}})
case <-subscription.Cancelled():
b.logger.Info("subscription canceled")
return
case <-b.ctx.Done():
b.logger.Info("Context done. Exiting state update handler")
return
}
}
Expand Down
17 changes: 12 additions & 5 deletions settlement/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@ const (

// Define the event types
const (
EventNewSettlementBatchAccepted = "NewSettlementBatchAccepted"
EventSequencersListUpdated = "SequencersListUpdated"
EventSettlementHealthStatus = "SettlementHealthStatus"
// This event should be emitted internally in order to communicate between the settlement layer and the hub client
EventNewSettlementBatchAccepted = "EventNewSettlementBatchAccepted"
// This event should be emitted externally when a batch is accepted
EventNewBatchAccepted = "EventNewBatchAccepted"
EventSequencersListUpdated = "SequencersListUpdated"
EventSettlementHealthStatus = "SettlementHealthStatus"
)

// EventDataNewSettlementBatchAccepted defines the structure of the event data for the EventNewSettlementBatchAccepted
type EventDataNewSettlementBatchAccepted struct {
// EventDataNewBatchAccepted defines the structure of the event data for the EventNewBatchAccepted
type EventDataNewBatchAccepted struct {
// EndHeight is the height of the last accepted batch
EndHeight uint64
// StateIndex is the rollapp-specific index the batch was saved in the SL
StateIndex uint64
}

// EventDataNewSettlementBatchAccepted defines the structure of the event data for the EventNewSettlementBatchAccepted
type EventDataNewSettlementBatchAccepted EventDataNewBatchAccepted

// EventDataSequencersListUpdated defines the structure of the event data for the EventSequencersListUpdated
type EventDataSequencersListUpdated struct {
// Sequencers is the list of sequencers
Expand All @@ -46,6 +52,7 @@ type EventDataSettlementHealthStatus struct {
// Define queries
var (
EventQueryNewSettlementBatchAccepted = QueryForEvent(EventNewSettlementBatchAccepted)
EventQueryNewBatchAccepted = QueryForEvent(EventNewBatchAccepted)
EventQuerySettlementHealthStatus = QueryForEvent(EventSettlementHealthStatus)
)

Expand Down