Skip to content

Commit

Permalink
*: support extra dBFT stage
Browse files Browse the repository at this point in the history
Ref. #112.

Signed-off-by: Anna Shaleva <[email protected]>
  • Loading branch information
AnnaShaleva committed Jul 17, 2024
1 parent 377942e commit c2f52d3
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 11 deletions.
64 changes: 64 additions & 0 deletions check.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,70 @@ func (d *DBFT[H]) checkCommit() {
return
}

// TODO: it should be considered: add PreCommit message instead of CommitAck and
// send this PreCommit *before* Commit. This will allow to create a completely
// custom message for keys exchange whereas the final Commit will keep only final
// signature bytes, as with usual dBFT.
if d.isAntiMEVExtensionEnabled() {
d.preBlock = d.CreatePreBlock()
hash := d.preBlock.Hash()

d.Logger.Info("processing PreBlock",
zap.Uint32("height", d.BlockIndex),
zap.Stringer("preBlock hash", hash),
zap.Int("tx_count", len(d.preBlock.Transactions())))

d.preBlockProcessed = true
d.ProcessPreBlock(d.preBlock)

if d.CommitSent() { // TODO: Do we really need to require Commit sent by *self* or M other's Commits is enough to sent CommitAck? It depends on the keys sharing logic
d.sendCommitAck()
d.changeTimer(d.SecondsPerBlock)
d.checkCommitAck()
} else {
d.Logger.Debug("can't send commitAck since self commit not yet sent")
}
return
}

d.lastBlockIndex = d.BlockIndex
d.lastBlockTime = d.Timer.Now()
d.block = d.CreateBlock()
hash := d.block.Hash()

d.Logger.Info("approving block",
zap.Uint32("height", d.BlockIndex),
zap.Stringer("hash", hash),
zap.Int("tx_count", len(d.block.Transactions())),
zap.Stringer("merkle", d.block.MerkleRoot()),
zap.Stringer("prev", d.block.PrevHash()))

d.blockProcessed = true
d.ProcessBlock(d.block)

// Do not initialize consensus process immediately. It's the caller's duty to
// start the new block acceptance process and call Reset at the
// new height.
}

func (d *DBFT[H]) checkCommitAck() {
if !d.hasAllTransactions() {
d.Logger.Debug("check commit: some transactions are missing", zap.Any("hashes", d.MissingTransactions))
return
}

count := 0
for _, msg := range d.CommitAckPayloads {
if msg != nil && msg.ViewNumber() == d.ViewNumber {
count++
}
}

if count < d.M() {
d.Logger.Debug("not enough to commit", zap.Int("count", count))
return
}

d.lastBlockIndex = d.BlockIndex
d.lastBlockTime = d.Timer.Now()
d.block = d.CreateBlock()
Expand Down
4 changes: 3 additions & 1 deletion commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package dbft
// Commit is an interface for dBFT Commit message.
type Commit interface {
// Signature returns commit's signature field
// which is a block signature for the current epoch.
// which is a block signature for the current epoch (in case of dBFT 2.0)
// or a data CNs need to exchange with to move further to CommitAck phase
// (in case of anti-MEV dBFT version).
Signature() []byte
}
8 changes: 8 additions & 0 deletions commit_ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dbft

// CommitAck is an interface for dBFT CommitAck message.
type CommitAck interface {
// Data returns commitAck's data that should be used for the final
// block construction.
Data() []byte
}
54 changes: 54 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ type Config[H Hash] struct {
// if current time is less than that of previous context.
// By default use millisecond precision.
TimestampIncrement uint64
// AntiMEVExtensionEnablingHeight denotes the height starting from which dBFT
// Anti-MEV extensions should be enabled. -1 means no extension is enabled.
AntiMEVExtensionEnablingHeight int64
// GetKeyPair returns an index of the node in the list of validators
// together with it's key pair.
GetKeyPair func([]PublicKey) (int, PrivateKey, PublicKey)
// NewPreBlockFromContext should allocate, fill from Context and return new block.PreBlock.
NewPreBlockFromContext func(ctx *Context[H]) PreBlock[H]
// NewBlockFromContext should allocate, fill from Context and return new block.Block.
NewBlockFromContext func(ctx *Context[H]) Block[H]
// RequestTx is a callback which is called when transaction contained
Expand All @@ -40,6 +45,8 @@ type Config[H Hash] struct {
VerifyBlock func(b Block[H]) bool
// Broadcast should broadcast payload m to the consensus nodes.
Broadcast func(m ConsensusPayload[H])
// ProcessBlock is called every time new preBlock is accepted.
ProcessPreBlock func(b PreBlock[H])
// ProcessBlock is called every time new block is accepted.
ProcessBlock func(b Block[H])
// GetBlock should return block with hash.
Expand All @@ -65,6 +72,8 @@ type Config[H Hash] struct {
NewChangeView func(newViewNumber byte, reason ChangeViewReason, timestamp uint64) ChangeView
// NewCommit is a constructor for payload.Commit.
NewCommit func(signature []byte) Commit
// NewCommitAck is a constructor for payload.CommitAck.
NewCommitAck func(signature []byte) CommitAck
// NewRecoveryRequest is a constructor for payload.RecoveryRequest.
NewRecoveryRequest func(ts uint64) RecoveryRequest
// NewRecoveryMessage is a constructor for payload.RecoveryMessage.
Expand All @@ -73,6 +82,7 @@ type Config[H Hash] struct {
VerifyPrepareRequest func(p ConsensusPayload[H]) error
// VerifyPrepareResponse performs external PrepareResponse verification and returns nil if it's successful.
VerifyPrepareResponse func(p ConsensusPayload[H]) error
// TODO: may be VerifyCommit callback should be added for case when AntiMEV extensions are enabled.
}

const defaultSecondsPerBlock = time.Second * 15
Expand Down Expand Up @@ -101,6 +111,8 @@ func defaultConfig[H Hash]() *Config[H] {

VerifyPrepareRequest: func(ConsensusPayload[H]) error { return nil },
VerifyPrepareResponse: func(ConsensusPayload[H]) error { return nil },

AntiMEVExtensionEnablingHeight: -1,
}
}

Expand Down Expand Up @@ -131,6 +143,20 @@ func checkConfig[H Hash](cfg *Config[H]) error {
return errors.New("NewRecoveryRequest is nil")
} else if cfg.NewRecoveryMessage == nil {
return errors.New("NewRecoveryMessage is nil")
} else if cfg.AntiMEVExtensionEnablingHeight >= 0 {
if cfg.NewPreBlockFromContext == nil {
return errors.New("NewPreBlockFromContext is nil")
} else if cfg.ProcessPreBlock == nil {
return errors.New("ProcessPreBlock is nil")
} else if cfg.NewCommitAck == nil {
return errors.New("NewCommitAck is nil")
}
} else if cfg.NewPreBlockFromContext != nil {
return errors.New("NewPreBlockFromContext is set, but AntiMEVExtensionEnablingHeight is not specified")
} else if cfg.ProcessPreBlock != nil {
return errors.New("ProcessPreBlock is set, but AntiMEVExtensionEnablingHeight is not specified")
} else if cfg.NewCommitAck != nil {
return errors.New("NewCommitAck is set, but AntiMEVExtensionEnablingHeight is not specified")
}

return nil
Expand Down Expand Up @@ -164,13 +190,27 @@ func WithSecondsPerBlock[H Hash](d time.Duration) func(config *Config[H]) {
}
}

// WithAntiMEVExtensionEnablingHeight sets AntiMEVExtensionEnablingHeight.
func WithAntiMEVExtensionEnablingHeight[H Hash](h int64) func(config *Config[H]) {
return func(cfg *Config[H]) {
cfg.AntiMEVExtensionEnablingHeight = h
}
}

// WithTimestampIncrement sets TimestampIncrement.
func WithTimestampIncrement[H Hash](u uint64) func(config *Config[H]) {
return func(cfg *Config[H]) {
cfg.TimestampIncrement = u
}
}

// WithNewPreBlockFromContext sets NewPreBlockFromContext.
func WithNewPreBlockFromContext[H Hash](f func(ctx *Context[H]) PreBlock[H]) func(config *Config[H]) {
return func(cfg *Config[H]) {
cfg.NewPreBlockFromContext = f
}
}

// WithNewBlockFromContext sets NewBlockFromContext.
func WithNewBlockFromContext[H Hash](f func(ctx *Context[H]) Block[H]) func(config *Config[H]) {
return func(cfg *Config[H]) {
Expand Down Expand Up @@ -227,6 +267,13 @@ func WithProcessBlock[H Hash](f func(b Block[H])) func(config *Config[H]) {
}
}

// WithProcessPreBlock sets ProcessPreBlock.
func WithProcessPreBlock[H Hash](f func(b PreBlock[H])) func(config *Config[H]) {
return func(cfg *Config[H]) {
cfg.ProcessPreBlock = f
}
}

// WithGetBlock sets GetBlock.
func WithGetBlock[H Hash](f func(h H) Block[H]) func(config *Config[H]) {
return func(cfg *Config[H]) {
Expand Down Expand Up @@ -297,6 +344,13 @@ func WithNewCommit[H Hash](f func(signature []byte) Commit) func(config *Config[
}
}

// WithNewCommitAck sets NewCommitAck.
func WithNewCommitAck[H Hash](f func(signature []byte) CommitAck) func(config *Config[H]) {
return func(cfg *Config[H]) {
cfg.NewCommitAck = f
}
}

// WithNewRecoveryRequest sets NewRecoveryRequest.
func WithNewRecoveryRequest[H Hash](f func(ts uint64) RecoveryRequest) func(config *Config[H]) {
return func(cfg *Config[H]) {
Expand Down
2 changes: 2 additions & 0 deletions consensus_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type ConsensusMessage[H Hash] interface {
GetPrepareResponse() PrepareResponse[H]
// GetCommit returns payload as if it was Commit.
GetCommit() Commit
// GetCommitAck returns payload as if it was CommitAck.
GetCommitAck() CommitAck
// GetRecoveryRequest returns payload as if it was RecoveryRequest.
GetRecoveryRequest() RecoveryRequest
// GetRecoveryMessage returns payload as if it was RecoveryMessage.
Expand Down
3 changes: 3 additions & 0 deletions consensus_message_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
PrepareRequestType MessageType = 0x20
PrepareResponseType MessageType = 0x21
CommitType MessageType = 0x30
CommitAckType MessageType = 0x31
RecoveryRequestType MessageType = 0x40
RecoveryMessageType MessageType = 0x41
)
Expand All @@ -26,6 +27,8 @@ func (m MessageType) String() string {
return "PrepareResponse"
case CommitType:
return "Commit"
case CommitAckType:
return "CommitAck"
case RecoveryRequestType:
return "RecoveryRequest"
case RecoveryMessageType:
Expand Down
71 changes: 69 additions & 2 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ type Context[H Hash] struct {
// Pub is node's public key.
Pub PublicKey

block Block[H]
header Block[H]
preBlock PreBlock[H]
preHeader PreBlock[H]
block Block[H]
header Block[H]
// blockProcessed denotes whether Config.ProcessBlock callback was called for the current
// height. If so, then no second call must happen. After new block is received by the user,
// dBFT stops any new transaction or messages processing as far as timeouts handling till
// the next call to Reset.
blockProcessed bool
// TODO: add a comment, t has another meaning than blockProcessed.
preBlockProcessed bool

// BlockIndex is current block index.
BlockIndex uint32
Expand Down Expand Up @@ -65,6 +69,15 @@ type Context[H Hash] struct {
// current round, so it's possible to verify Commit against it) or stored till
// the corresponding PrepareRequest receiving.
CommitPayloads []ConsensusPayload[H]
// CommitAckPayloads stores consensus CommitAck payloads sent through all epochs.
// It is assumed that valid CommitAck payloads can only be sent once by a single
// node per the whole set of consensus epochs for particular block. Invalid
// CommitAck payloads are kicked off this list immediately (if Commit
// [TODO: and PrepareRequest? How do we verify CommitAck payloads: based on
// Commit only or based on PrepareRequest?] was received for the current round,
// so it's possible to verify CommitAck against it) or stored till the
// corresponding Commit receiving.
CommitAckPayloads []ConsensusPayload[H]
// ChangeViewPayloads stores consensus ChangeView payloads for the current epoch.
ChangeViewPayloads []ConsensusPayload[H]
// LastChangeViewPayloads stores consensus ChangeView payloads for the last epoch.
Expand Down Expand Up @@ -149,6 +162,12 @@ func (c *Context[H]) CommitSent() bool {
return !c.WatchOnly() && c.CommitPayloads[c.MyIndex] != nil
}

// CommitAckSent returns true iff CommitAck message was sent for the current epoch
// assuming that the node can't go further than current epoch after commit was sent.
func (c *Context[H]) CommitAckSent() bool {
return !c.WatchOnly() && c.CommitAckPayloads[c.MyIndex] != nil
}

// BlockSent returns true iff block was formed AND sent for the current height.
// Once block is sent, the consensus stops new transactions and messages processing
// as far as timeouts handling.
Expand Down Expand Up @@ -227,6 +246,7 @@ func (c *Context[H]) reset(view byte, ts uint64) {
c.ChangeViewPayloads = make([]ConsensusPayload[H], n)
if view == 0 {
c.CommitPayloads = make([]ConsensusPayload[H], n)
c.CommitAckPayloads = make([]ConsensusPayload[H], n)
}
c.PreparationPayloads = make([]ConsensusPayload[H], n)

Expand Down Expand Up @@ -285,24 +305,71 @@ func (c *Context[H]) CreateBlock() Block[H] {
}

c.block.SetTransactions(txx)

// TODO: do we really need this? CreateBlock will be called when all decryption data are available, thus we may
// add all necessary information in MakeHeader or in SetTransactions. For now, I'd skip it.
//if c.isAntiMEVExtensionEnabled() {
// c.block.Finalize()
//}
}

return c.block
}

// CreatePreBlock returns PreBlock for the current epoch.
func (c *Context[H]) CreatePreBlock() PreBlock[H] {
if c.preBlock == nil {
if c.preBlock = c.MakePreHeader(); c.preBlock == nil {
return nil
}

txx := make([]Transaction[H], len(c.TransactionHashes))

for i, h := range c.TransactionHashes {
txx[i] = c.Transactions[h]
}

c.preBlock.SetTransactions(txx)
}

return c.preBlock
}

// isAntiMEVExtensionEnabled returns whether Anti-MEV dBFT extension is enabled
// at the currently processing block height.
func (c *Context[H]) isAntiMEVExtensionEnabled() bool {
return c.Config.AntiMEVExtensionEnablingHeight >= 0 && uint32(c.Config.AntiMEVExtensionEnablingHeight) < c.BlockIndex
}

// MakeHeader returns half-filled block for the current epoch.
// All hashable fields will be filled.
func (c *Context[H]) MakeHeader() Block[H] {
if c.header == nil {
if !c.RequestSentOrReceived() {
return nil
}
if c.isAntiMEVExtensionEnabled() && c.CountCommitted() < c.M() { // TODO: replace with count committed for *my* view?
return nil
}
c.header = c.Config.NewBlockFromContext(c)
}

return c.header
}

// MakePreHeader returns half-filled block for the current epoch.
// All hashable fields will be filled.
func (c *Context[H]) MakePreHeader() PreBlock[H] {
if c.preHeader == nil {
if !c.RequestSentOrReceived() {
return nil
}
c.preHeader = c.Config.NewPreBlockFromContext(c)
}

return c.preHeader
}

// hasAllTransactions returns true iff all transactions were received
// for the proposed block.
func (c *Context[H]) hasAllTransactions() bool {
Expand Down
Loading

0 comments on commit c2f52d3

Please sign in to comment.