Skip to content

Commit

Permalink
Reduce Preprepare size (#1924)
Browse files Browse the repository at this point in the history
* Log line from Warn to Debug

* Bump version to 1.5.8

* New encoding for RoundChangeCertificate

* clean up NewIndexedRoundChangeMessage

* Use pointers whenever possible to reduce call by value costs

* Merge

* Fix compile error

* Added comment

* Added docs

* Add error checking

* Add error

* Clean up encoding logic

* more cleanup

* Fix compile errors

* Added doc

* Fix dummy object in test

* Fix

* Fix tests

* Remove nil

* fix proposal not rlp serializable

* fix test

* Add tests

* Fix lint

* Fix test

* fix test

* Fix test

* Fix

* Add log

* Revert change

* cleanup (#1925)

* cleanup

* revert

* Fix debug log

Co-authored-by: Pasto <[email protected]>
Co-authored-by: Gaston Ponti <[email protected]>
  • Loading branch information
3 people authored Jul 14, 2022
1 parent 1e2a6a7 commit 152b6dc
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 19 deletions.
181 changes: 166 additions & 15 deletions consensus/istanbul/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/crypto"
blscrypto "github.com/celo-org/celo-blockchain/crypto/bls"
"github.com/celo-org/celo-blockchain/log"
"github.com/celo-org/celo-blockchain/p2p/enode"
"github.com/celo-org/celo-blockchain/rlp"
)
Expand Down Expand Up @@ -126,6 +127,10 @@ func (v *View) Cmp(y *View) int {
}

// ## RoundChangeCertificate ##############################################################
// To considerably reduce the bandwidth used by the RoundChangeCertificate type (which often
// contains repeated Proposal from different RoundChange messages), we break it apart during
// RLP encoding and then build it back during decoding. Proposals are sent just once, and
// Messages referencing them will use their Hash instead.

type RoundChangeCertificate struct {
RoundChangeMessages []Message
Expand All @@ -135,6 +140,134 @@ func (b *RoundChangeCertificate) IsEmpty() bool {
return len(b.RoundChangeMessages) == 0
}

// EncodeRLP serializes b into the Ethereum RLP format.
func (c *RoundChangeCertificate) EncodeRLP(w io.Writer) error {
proposals, messages, err := c.asValues()
if err != nil {
return err
}
log.Debug("Round change certificate proposals", "count", len(proposals))
return rlp.Encode(w, []interface{}{proposals, messages})
}

// DecodeRLP implements rlp.Decoder, and load the consensus fields from a RLP stream.
func (c *RoundChangeCertificate) DecodeRLP(s *rlp.Stream) error {
var decodestr struct {
Proposals []*types.Block
IndexedMessages []IndexedRoundChangeMessage
}

if err := s.Decode(&decodestr); err != nil {
return err
}
return c.setValues(decodestr.Proposals, decodestr.IndexedMessages)
}

// setValues recreates the RoundChange messages from the props (Proposal set/index) and the
// list of IndexedRoundChangeMessage, which is supposed to be the same as the RoundChange
// Messages but with the proposals just referenced to the Proposals set.
func (c *RoundChangeCertificate) setValues(props []*types.Block, iMess []IndexedRoundChangeMessage) error {
// create a Proposal index from the list
propIndex := make(map[common.Hash]Proposal)
for _, prop := range props {
propIndex[prop.Hash()] = prop
}
// Recreate Messages one by one
mess := make([]Message, len(iMess))
for i, im := range iMess {
mess[i] = Message{
Code: im.Message.Code,
Address: im.Message.Address,
Signature: im.Message.Signature,
}

// Add the proposal to the message if it had one
roundChange, err := im.Message.TryRoundChange()
if err != nil {
return err
}

if proposal, ok := propIndex[im.ProposalHash]; ok {
roundChange.PreparedCertificate.Proposal = proposal
}

setMessageBytes(&mess[i], roundChange)
mess[i].roundChange = roundChange
}
c.RoundChangeMessages = mess
return nil
}

type IndexedRoundChangeMessage struct {
ProposalHash common.Hash
Message Message // PreparedCertificate.Proposal = nil if any
}

// asValues presents the RoundChangeCertificate as values for RLP Serialization.
// This is done using a list of proposals, and the RoundChange messages using
// hash references instead of the full proposal objects, to reduce bandwidth.
func (c *RoundChangeCertificate) asValues() ([]*types.Block, []*IndexedRoundChangeMessage, error) {
var err error

messages := make([]*IndexedRoundChangeMessage, len(c.RoundChangeMessages))
proposalsMap := make(map[common.Hash]*types.Block)

for i, message := range c.RoundChangeMessages {
var proposal *types.Block
proposal, messages[i], err = extractProposal(&message)
if err != nil {
return nil, nil, err
}

if proposal != nil {
// we don't use the height since we know they MUST be the same
proposalsMap[proposal.Hash()] = proposal
}
}

// Iterate values. RLP does not support maps
proposals := make([]*types.Block, len(proposalsMap))
var i = 0
for _, p := range proposalsMap {
proposals[i] = p
i++
}
return proposals, messages, nil
}

func extractProposal(message *Message) (*types.Block, *IndexedRoundChangeMessage, error) {
roundChange, err := message.TryRoundChange()
if err != nil {
return nil, nil, err
}

pc := roundChange.PreparedCertificate

// Assume message.Code = MsgRoundChange
indexedMsg := IndexedRoundChangeMessage{
Message: Message{
Code: message.Code,
Address: message.Address,
Signature: message.Signature,
},
}

if pc.Proposal != nil {
indexedMsg.ProposalHash = pc.Proposal.Hash()
}

curatedPC := EmptyPreparedCertificate()
curatedPC.PrepareOrCommitMessages = pc.PrepareOrCommitMessages

setMessageBytes(&indexedMsg.Message,
&RoundChange{
View: roundChange.View,
PreparedCertificate: curatedPC,
})

return pc.Proposal.(*types.Block), &indexedMsg, nil
}

// ## Preprepare ##############################################################

// NewPreprepareMessage constructs a Message instance with the given sender and
Expand Down Expand Up @@ -522,7 +655,7 @@ type Message struct {
func setMessageBytes(msg *Message, innerMessage interface{}) {
bytes, err := rlp.EncodeToBytes(innerMessage)
if err != nil {
panic(fmt.Sprintf("attempt to serialise inner message of type %T failed", innerMessage))
panic(fmt.Sprintf("attempt to serialise inner message of type %T failed. %s", innerMessage, err))
}
msg.Msg = bytes
}
Expand All @@ -537,20 +670,8 @@ func (m *Message) Sign(signingFn func(data []byte) ([]byte, error)) error {
return err
}

func (m *Message) DecodeRLP(stream *rlp.Stream) error {
type decodable Message
var d decodable
err := stream.Decode(&d)
if err != nil {
return err
}
*m = Message(d)

if len(m.Msg) == 0 && len(m.Signature) == 0 {
// Empty validator handshake message
return nil
}

func (m *Message) DecodeMessage() error {
var err error
switch m.Code {
case MsgPreprepare:
var p *Preprepare
Expand Down Expand Up @@ -598,7 +719,23 @@ func (m *Message) DecodeRLP(stream *rlp.Stream) error {
err = fmt.Errorf("unrecognised message code %d", m.Code)
}
return err
}

func (m *Message) DecodeRLP(stream *rlp.Stream) error {
type decodable Message
var d decodable
err := stream.Decode(&d)
if err != nil {
return err
}
*m = Message(d)

if len(m.Msg) == 0 && len(m.Signature) == 0 {
// Empty validator handshake message
return nil
}

return m.DecodeMessage()
}

// FromPayload decodes b into a Message instance it will set one of the private
Expand Down Expand Up @@ -666,6 +803,20 @@ func (m *Message) Prepare() *Subject {
return m.prepare
}

// Prepare returns round change if this is a round change message.
func (m *Message) TryRoundChange() (*RoundChange, error) {
if m.roundChange != nil {
return m.roundChange, nil
}
if m.Code != MsgRoundChange {
return nil, fmt.Errorf("expected round change message, received code: %d", m.Code)
}
if err := m.DecodeMessage(); err != nil {
return nil, err
}
return m.roundChange, nil
}

// Prepare returns round change if this is a round change message.
func (m *Message) RoundChange() *RoundChange {
return m.roundChange
Expand Down
61 changes: 60 additions & 1 deletion consensus/istanbul/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/rlp"
"golang.org/x/crypto/sha3"
"gotest.tools/assert"
)

// testHasher is the helper tool for transaction/receipt list hashing.
Expand Down Expand Up @@ -134,9 +135,29 @@ func dummyMessage(code uint64) *Message {
return msg
}

func dummyRoundChangeMessage() *Message {
msg := NewPrepareMessage(dummySubject(), common.HexToAddress("AABB"))
// Set empty rather than nil signature since this is how rlp decodes non
// existent slices.
msg.Signature = []byte{}
msg.Code = MsgRoundChange
roundChange := &RoundChange{
View: &View{
Round: common.Big1,
Sequence: common.Big2,
},
PreparedCertificate: PreparedCertificate{
PrepareOrCommitMessages: []Message{},
Proposal: dummyBlock(2),
},
}
setMessageBytes(msg, roundChange)
return msg
}

func dummyRoundChangeCertificate() *RoundChangeCertificate {
return &RoundChangeCertificate{
RoundChangeMessages: []Message{*dummyMessage(42), *dummyMessage(32), *dummyMessage(15)},
RoundChangeMessages: []Message{*dummyRoundChangeMessage(), *dummyRoundChangeMessage(), *dummyRoundChangeMessage()},
}
}

Expand Down Expand Up @@ -202,6 +223,35 @@ func TestRoundChangeCertificateRLPEncoding(t *testing.T) {
t.Fatalf("Error %v", err)
}

assert.Equal(t, len(original.RoundChangeMessages), len(original.RoundChangeMessages))
o1 := original.RoundChangeMessages[0]
r1 := result.RoundChangeMessages[0]
if !reflect.DeepEqual(o1.Code, r1.Code) {
t.Fatalf("RLP Encode/Decode mismatch at first Code")
}

if !reflect.DeepEqual(o1.Code, r1.Code) {
t.Fatalf("RLP Encode/Decode mismatch at first Code")
}

if !reflect.DeepEqual(o1.Address, r1.Address) {
t.Fatalf("RLP Encode/Decode mismatch at first Address")
}

if !reflect.DeepEqual(o1.Signature, r1.Signature) {
t.Fatalf("RLP Encode/Decode mismatch at first Signature")
}

if !reflect.DeepEqual(o1.Msg, r1.Msg) {
t.Fatalf("RLP Encode/Decode mismatch at first internal Msg bytes. %v ----- %v", o1.Msg, r1.Msg)
}

original.RoundChangeMessages[0].prepare = nil
original.RoundChangeMessages[1].prepare = nil
original.RoundChangeMessages[2].prepare = nil
result.RoundChangeMessages[0].roundChange = nil
result.RoundChangeMessages[1].roundChange = nil
result.RoundChangeMessages[2].roundChange = nil
if !reflect.DeepEqual(original, result) {
t.Fatalf("RLP Encode/Decode mismatch. Got %v, expected %v", result, original)
}
Expand All @@ -224,6 +274,15 @@ func TestPreprepareRLPEncoding(t *testing.T) {
t.Fatalf("Error %v", err)
}

o := original.RoundChangeCertificate
o.RoundChangeMessages[0].prepare = nil
o.RoundChangeMessages[1].prepare = nil
o.RoundChangeMessages[2].prepare = nil
r := result.RoundChangeCertificate
r.RoundChangeMessages[0].roundChange = nil
r.RoundChangeMessages[1].roundChange = nil
r.RoundChangeMessages[2].roundChange = nil

// decoded Blocks don't equal Original ones so we need to check equality differently
assertEqual(t, "RLP Encode/Decode mismatch: View", result.View, original.View)
assertEqual(t, "RLP Encode/Decode mismatch: RoundChangeCertificate", result.RoundChangeCertificate, original.RoundChangeCertificate)
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func nodeInfo(chain *core.BlockChain, network uint64) *NodeInfo {
func Handle(backend Backend, peer *Peer) error {
for {
if err := handleMessage(backend, peer); err != nil {
peer.Log().Warn("Message handling failed in `eth`", "err", err)
peer.Log().Debug("Message handling failed in `eth`", "err", err)
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/eth/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const ProtocolName = "eth"
var ProtocolVersions = []uint{ETH66, ETH65}

// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024 * 10 // Celo increase for istanbul round change certificates
const maxMessageSize = 10 * 1024 * 1024

const (
// Protocol messages in eth/64 (celo65)
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
const (
VersionMajor = 1 // Major version component of the current release
VersionMinor = 5 // Minor version component of the current release
VersionPatch = 7 // Patch version component of the current release
VersionPatch = 8 // Patch version component of the current release
VersionMeta = "stable" // Version metadata to append to the version string
)

Expand Down

0 comments on commit 152b6dc

Please sign in to comment.