Skip to content

Commit

Permalink
Consensus threshold to 2f+1 for zk overflow chains (#1313)
Browse files Browse the repository at this point in the history
Consensus threshold to 2f+1 for zk overflow chains
  • Loading branch information
0xnogo authored Sep 1, 2024
1 parent 9952bca commit c8c7d92
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 18 deletions.
19 changes: 17 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipexec/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker"
)

// Batching strategies
const (
BestEffortBatchingStrategyID = uint32(0)
ZKOverflowBatchingStrategyID = uint32(1)
)

type BatchContext struct {
report commitReportWithSendRequests
inflight []InflightInternalExecutionReport
Expand All @@ -47,6 +53,7 @@ type BatchContext struct {

type BatchingStrategy interface {
BuildBatch(ctx context.Context, batchCtx *BatchContext) ([]ccip.ObservedMessage, []messageExecStatus)
GetBatchingStrategyID() uint32
}

type BestEffortBatchingStrategy struct{}
Expand All @@ -58,9 +65,9 @@ type ZKOverflowBatchingStrategy struct {
func NewBatchingStrategy(batchingStrategyID uint32, statusChecker statuschecker.CCIPTransactionStatusChecker) (BatchingStrategy, error) {
var batchingStrategy BatchingStrategy
switch batchingStrategyID {
case 0:
case BestEffortBatchingStrategyID:
batchingStrategy = &BestEffortBatchingStrategy{}
case 1:
case ZKOverflowBatchingStrategyID:
batchingStrategy = &ZKOverflowBatchingStrategy{
statuschecker: statusChecker,
}
Expand All @@ -70,6 +77,10 @@ func NewBatchingStrategy(batchingStrategyID uint32, statusChecker statuschecker.
return batchingStrategy, nil
}

func (s *BestEffortBatchingStrategy) GetBatchingStrategyID() uint32 {
return BestEffortBatchingStrategyID
}

// BestEffortBatchingStrategy is a batching strategy that tries to batch as many messages as possible (up to certain limits).
func (s *BestEffortBatchingStrategy) BuildBatch(
ctx context.Context,
Expand All @@ -95,6 +106,10 @@ func (s *BestEffortBatchingStrategy) BuildBatch(
return batchBuilder.batch, batchBuilder.statuses
}

func (bs *ZKOverflowBatchingStrategy) GetBatchingStrategyID() uint32 {
return ZKOverflowBatchingStrategyID
}

// ZKOverflowBatchingStrategy is a batching strategy for ZK chains overflowing under certain conditions.
// It is a simple batching strategy that only allows one message to be added to the batch.
// TXM is used to perform the ZK check: if the message failed the check, it will be skipped.
Expand Down
11 changes: 9 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipexec/batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,13 +850,13 @@ func runBatchingStrategyTests(t *testing.T, strategy BatchingStrategy, available

seqNrs, execStates := strategy.BuildBatch(context.Background(), batchContext)

runAssertions(t, tc, seqNrs, execStates)
runAssertions(t, tc, seqNrs, execStates, strategy)
})
}
}

// Utility function to run common assertions
func runAssertions(t *testing.T, tc testCase, seqNrs []ccip.ObservedMessage, execStates []messageExecStatus) {
func runAssertions(t *testing.T, tc testCase, seqNrs []ccip.ObservedMessage, execStates []messageExecStatus, strategy BatchingStrategy) {
if tc.expectedSeqNrs == nil {
assert.Len(t, seqNrs, 0)
} else {
Expand All @@ -868,6 +868,13 @@ func runAssertions(t *testing.T, tc testCase, seqNrs []ccip.ObservedMessage, exe
} else {
assert.Equal(t, tc.expectedStates, execStates)
}

batchingStratID := strategy.GetBatchingStrategyID()
if strategyType := reflect.TypeOf(strategy); strategyType == reflect.TypeOf(&BestEffortBatchingStrategy{}) {
assert.Equal(t, batchingStratID, uint32(0))
} else {
assert.Equal(t, batchingStratID, uint32(1))
}
}

func createTestMessage(seqNr uint64, sender cciptypes.Address, nonce uint64, feeToken cciptypes.Address, feeAmount *big.Int, executed bool, data []byte) cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta {
Expand Down
23 changes: 19 additions & 4 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,21 +468,36 @@ func (r *ExecutionReportingPlugin) buildReport(ctx context.Context, lggr logger.
return encodedReport, nil
}

// Returns required number of observations to reach consensus
func (r *ExecutionReportingPlugin) getConsensusThreshold() int {
// Default consensus threshold is F+1
consensusThreshold := r.F + 1
if r.batchingStrategy.GetBatchingStrategyID() == ZKOverflowBatchingStrategyID {
// For batching strategy 1, consensus threshold is 2F+1
// This is because chains that can overflow need to reach consensus during the inflight cache period
// to avoid 2 transmissions round of an overflown message.
consensusThreshold = 2*r.F + 1
}
return consensusThreshold
}

func (r *ExecutionReportingPlugin) Report(ctx context.Context, timestamp types.ReportTimestamp, query types.Query, observations []types.AttributedObservation) (bool, types.Report, error) {
lggr := r.lggr.Named("ExecutionReport")
if healthy, err := r.chainHealthcheck.IsHealthy(ctx); err != nil {
return false, nil, err
} else if !healthy {
return false, nil, ccip.ErrChainIsNotHealthy
}
consensusThreshold := r.getConsensusThreshold()
lggr.Infof("Consensus threshold set to: %d", consensusThreshold)

parsableObservations := ccip.GetParsableObservations[ccip.ExecutionObservation](lggr, observations)
// Need at least F+1 observations
if len(parsableObservations) <= r.F {
lggr.Warn("Non-empty observations <= F, need at least F+1 to continue")
if len(parsableObservations) < consensusThreshold {
lggr.Warnf("Insufficient observations: only %d received, but need more than %d to proceed", len(parsableObservations), consensusThreshold)
return false, nil, nil
}

observedMessages, err := calculateObservedMessagesConsensus(parsableObservations, r.F)
observedMessages, err := calculateObservedMessagesConsensus(parsableObservations, consensusThreshold)
if err != nil {
return false, nil, err
}
Expand Down
75 changes: 65 additions & 10 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/testhelpers"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/statuschecker"
)

func TestExecutionReportingPlugin_Observation(t *testing.T) {
Expand Down Expand Up @@ -232,26 +233,43 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) {

func TestExecutionReportingPlugin_Report(t *testing.T) {
testCases := []struct {
name string
f int
committedSeqNum uint64
observations []ccip.ExecutionObservation
name string
f int
batchingStrategyId uint32
committedSeqNum uint64
observations []ccip.ExecutionObservation

expectingSomeReport bool
expectedReport cciptypes.ExecReport
expectingSomeErr bool
}{
{
name: "not enough observations to form consensus",
f: 5,
committedSeqNum: 5,
name: "not enough observations to form consensus - best effort batching",
f: 5,
batchingStrategyId: 0,
committedSeqNum: 5,
observations: []ccip.ExecutionObservation{
{Messages: map[uint64]ccip.MsgData{3: {}, 4: {}}},
{Messages: map[uint64]ccip.MsgData{3: {}, 4: {}}},
},
expectingSomeErr: false,
expectingSomeReport: false,
},
{
name: "not enough observaitons to form consensus - zk batching",
f: 5,
batchingStrategyId: 1,
committedSeqNum: 5,
observations: []ccip.ExecutionObservation{
{Messages: map[uint64]ccip.MsgData{3: {}, 4: {}}},
{Messages: map[uint64]ccip.MsgData{3: {}, 4: {}}},
{Messages: map[uint64]ccip.MsgData{3: {}, 4: {}}},
{Messages: map[uint64]ccip.MsgData{3: {}, 4: {}}},
{Messages: map[uint64]ccip.MsgData{3: {}, 4: {}}},
{Messages: map[uint64]ccip.MsgData{3: {}, 4: {}}},
{Messages: map[uint64]ccip.MsgData{3: {}, 4: {}}},
},
},
{
name: "zero observations",
f: 0,
Expand All @@ -268,6 +286,9 @@ func TestExecutionReportingPlugin_Report(t *testing.T) {
p := ExecutionReportingPlugin{}
p.lggr = logger.TestLogger(t)
p.F = tc.f
bs, err := NewBatchingStrategy(tc.batchingStrategyId, &statuschecker.TxmStatusChecker{})
assert.NoError(t, err)
p.batchingStrategy = bs

p.commitStoreReader = ccipdatamocks.NewCommitStoreReader(t)
chainHealthcheck := ccipcachemocks.NewChainHealthcheck(t)
Expand All @@ -281,12 +302,12 @@ func TestExecutionReportingPlugin_Report(t *testing.T) {
observations[i] = types.AttributedObservation{Observation: b, Observer: commontypes.OracleID(i + 1)}
}

_, _, err := p.Report(ctx, types.ReportTimestamp{}, types.Query{}, observations)
_, _, err2 := p.Report(ctx, types.ReportTimestamp{}, types.Query{}, observations)
if tc.expectingSomeErr {
assert.Error(t, err)
assert.Error(t, err2)
return
}
assert.NoError(t, err)
assert.NoError(t, err2)
})
}
}
Expand Down Expand Up @@ -1422,3 +1443,37 @@ func TestExecutionReportingPlugin_ensurePriceRegistrySynchronization(t *testing.
require.NoError(t, err)
require.Equal(t, mockPriceRegistryReader2, p.sourcePriceRegistry)
}

func TestExecutionReportingPlugin_getConsensusThreshold(t *testing.T) {
tests := []struct {
name string
batchingStrategyID uint32
F int
expectedConsensusThreshold int
}{
{
name: "zk batching strategy",
batchingStrategyID: uint32(1),
F: 5,
expectedConsensusThreshold: 11,
},
{
name: "default batching strategy",
batchingStrategyID: uint32(0),
F: 5,
expectedConsensusThreshold: 6,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
p := &ExecutionReportingPlugin{}
p.F = tc.F
bs, err := NewBatchingStrategy(tc.batchingStrategyID, &statuschecker.TxmStatusChecker{})
assert.NoError(t, err)
p.batchingStrategy = bs

require.Equal(t, tc.expectedConsensusThreshold, p.getConsensusThreshold())
})
}
}

0 comments on commit c8c7d92

Please sign in to comment.