Skip to content

Commit

Permalink
Implement GPBFT message compression using zstd
Browse files Browse the repository at this point in the history
Add the ability to compress GPBFT messages controllable via manifest.
Implement benchmarks to compare vanilla CBOR and ZSTD encoding.

Basic local run:
```
BenchmarkCborEncoding-12    	   47173	     25491 ns/op	  135409 B/op	      87 allocs/op
BenchmarkCborDecoding-12    	   64550	     18078 ns/op	   61728 B/op	     209 allocs/op
BenchmarkZstdEncoding-12    	   29061	     41489 ns/op	  193455 B/op	      88 allocs/op
BenchmarkZstdDecoding-12    	   66172	     17924 ns/op	  176517 B/op	     211 allocs/op
```

Fixes #786
  • Loading branch information
masih committed Dec 12, 2024
1 parent 1b17c6c commit ea3ad13
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 14 deletions.
26 changes: 26 additions & 0 deletions f3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,31 @@ func TestF3DynamicManifest_WithPauseAndRebootstrap(t *testing.T) {
require.Equal(t, env.manifest.BootstrapEpoch-env.manifest.EC.Finality, cert0.ECChain.Base().Epoch)
}

func TestF3DynamicManifest_RebootstrapWithCompression(t *testing.T) {
env := newTestEnvironment(t).withNodes(2).withDynamicManifest().start()
env.waitForInstanceNumber(10, 30*time.Second, true)

env.manifest.Pause = true
env.updateManifest()

env.waitForNodesStopped()

env.manifest.BootstrapEpoch = 956
env.manifest.PubSub.CompressionEnabled = true
env.manifest.Pause = false
env.updateManifest()
env.waitForManifest()

env.clock.Add(1 * time.Minute)

env.waitForInstanceNumber(3, 30*time.Second, true)
env.requireEqualManifests(true)

cert0, err := env.nodes[0].f3.GetCert(env.testCtx, 0)
require.NoError(t, err)
require.Equal(t, env.manifest.BootstrapEpoch-env.manifest.EC.Finality, cert0.ECChain.Base().Epoch)
}

func TestF3LateBootstrap(t *testing.T) {
env := newTestEnvironment(t).withNodes(2).start()

Expand Down Expand Up @@ -286,6 +311,7 @@ var base = manifest.Manifest{
EC: manifest.DefaultEcConfig,
CertificateExchange: manifest.DefaultCxConfig,
CatchUpAlignment: manifest.DefaultCatchUpAlignment,
PubSub: manifest.DefaultPubSubConfig,
}

type testNode struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/klauspost/compress v1.17.11
github.com/libp2p/go-libp2p v0.37.2
github.com/libp2p/go-libp2p-pubsub v0.11.0
github.com/marcboeker/go-duckdb v1.8.2
Expand Down Expand Up @@ -67,7 +68,6 @@ require (
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down
35 changes: 22 additions & 13 deletions host.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package f3

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -51,7 +50,8 @@ type gpbftRunner struct {
msgsMutex sync.Mutex
selfMessages map[uint64]map[roundPhase][]*gpbft.GMessage

inputs gpbftInputs
inputs gpbftInputs
msgEncoding gMessageEncoding
}

type roundPhase struct {
Expand Down Expand Up @@ -132,6 +132,15 @@ func newRunner(
return nil, fmt.Errorf("creating participant: %w", err)
}
runner.participant = p

if runner.manifest.PubSub.CompressionEnabled {
runner.msgEncoding, err = newZstdGMessageEncoding()
if err != nil {
return nil, err
}

Check warning on line 140 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L139-L140

Added lines #L139 - L140 were not covered by tests
} else {
runner.msgEncoding = &cborGMessageEncoding{}
}
return runner, nil
}

Expand Down Expand Up @@ -443,13 +452,12 @@ func (h *gpbftRunner) BroadcastMessage(ctx context.Context, msg *gpbft.GMessage)
if h.topic == nil {
return pubsub.ErrTopicClosed
}
var bw bytes.Buffer
err = msg.MarshalCBOR(&bw)
encoded, err := h.msgEncoding.Encode(msg)
if err != nil {
return fmt.Errorf("marshalling GMessage for broadcast: %w", err)
return fmt.Errorf("encoding GMessage for broadcast: %w", err)

Check warning on line 457 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L457

Added line #L457 was not covered by tests
}

err = h.topic.Publish(ctx, bw.Bytes())
err = h.topic.Publish(ctx, encoded)
if err != nil {
return fmt.Errorf("publishing message: %w", err)
}
Expand All @@ -464,11 +472,11 @@ func (h *gpbftRunner) rebroadcastMessage(msg *gpbft.GMessage) error {
if h.topic == nil {
return pubsub.ErrTopicClosed
}
var bw bytes.Buffer
if err := msg.MarshalCBOR(&bw); err != nil {
return fmt.Errorf("marshalling GMessage for broadcast: %w", err)
encoded, err := h.msgEncoding.Encode(msg)
if err != nil {
return fmt.Errorf("encoding GMessage for broadcast: %w", err)

Check warning on line 477 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L477

Added line #L477 was not covered by tests
}
if err := h.topic.Publish(h.runningCtx, bw.Bytes()); err != nil {
if err := h.topic.Publish(h.runningCtx, encoded); err != nil {
return fmt.Errorf("publishing message: %w", err)
}
return nil
Expand All @@ -481,12 +489,13 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg
recordValidationTime(ctx, start, _result)
}(time.Now())

var gmsg gpbft.GMessage
if err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data)); err != nil {
gmsg, err := h.msgEncoding.Decode(msg.Data)
if err != nil {
log.Debugw("failed to decode message", "from", msg.GetFrom(), "err", err)

Check warning on line 494 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L494

Added line #L494 was not covered by tests
return pubsub.ValidationReject
}

switch validatedMessage, err := h.participant.ValidateMessage(&gmsg); {
switch validatedMessage, err := h.participant.ValidateMessage(gmsg); {
case errors.Is(err, gpbft.ErrValidationInvalid):
log.Debugf("validation error during validation: %+v", err)
return pubsub.ValidationReject
Expand Down
16 changes: 16 additions & 0 deletions manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ var (
MaximumPollInterval: 4 * DefaultEcConfig.Period,
}

DefaultPubSubConfig = PubSubConfig{
CompressionEnabled: false,
}

// Default instance alignment when catching up.
DefaultCatchUpAlignment = DefaultEcConfig.Period / 2
)
Expand Down Expand Up @@ -190,6 +194,12 @@ func (e *EcConfig) Validate() error {
return nil
}

type PubSubConfig struct {
CompressionEnabled bool
}

func (p *PubSubConfig) Validate() error { return nil }

// Manifest identifies the specific configuration for the F3 instance currently running.
type Manifest struct {
// Pause stops the participation in F3.
Expand Down Expand Up @@ -223,6 +233,8 @@ type Manifest struct {
EC EcConfig
// Certificate Exchange specific parameters
CertificateExchange CxConfig
// PubSubConfig specifies the pubsub related configuration.
PubSub PubSubConfig
}

func (m *Manifest) Equal(o *Manifest) bool {
Expand Down Expand Up @@ -285,6 +297,9 @@ func (m *Manifest) Validate() error {
if err := m.CertificateExchange.Validate(); err != nil {
return fmt.Errorf("invalid manifest: invalid certificate exchange config: %w", err)
}
if err := m.PubSub.Validate(); err != nil {
return fmt.Errorf("invalid manifest: invalid pubsub config: %w", err)
}

Check warning on line 302 in manifest/manifest.go

View check run for this annotation

Codecov / codecov/patch

manifest/manifest.go#L301-L302

Added lines #L301 - L302 were not covered by tests

return nil
}
Expand All @@ -301,6 +316,7 @@ func LocalDevnetManifest() *Manifest {
Gpbft: DefaultGpbftConfig,
CertificateExchange: DefaultCxConfig,
CatchUpAlignment: DefaultCatchUpAlignment,
PubSub: DefaultPubSubConfig,
}
return m
}
Expand Down
75 changes: 75 additions & 0 deletions msg_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package f3

import (
"bytes"

"github.com/filecoin-project/go-f3/gpbft"
"github.com/klauspost/compress/zstd"
)

var (
_ gMessageEncoding = (*cborGMessageEncoding)(nil)
_ gMessageEncoding = (*zstdGMessageEncoding)(nil)
)

type gMessageEncoding interface {
Encode(*gpbft.GMessage) ([]byte, error)
Decode([]byte) (*gpbft.GMessage, error)
}

type cborGMessageEncoding struct{}

func (c *cborGMessageEncoding) Encode(m *gpbft.GMessage) ([]byte, error) {
var buf bytes.Buffer
if err := m.MarshalCBOR(&buf); err != nil {
return nil, err
}

Check warning on line 26 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L25-L26

Added lines #L25 - L26 were not covered by tests
return buf.Bytes(), nil
}

func (c *cborGMessageEncoding) Decode(v []byte) (*gpbft.GMessage, error) {
r := bytes.NewReader(v)
var msg gpbft.GMessage
if err := msg.UnmarshalCBOR(r); err != nil {
return nil, err
}

Check warning on line 35 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L34-L35

Added lines #L34 - L35 were not covered by tests
return &msg, nil
}

type zstdGMessageEncoding struct {
cborEncoding cborGMessageEncoding
compressor *zstd.Encoder
decompressor *zstd.Decoder
}

func newZstdGMessageEncoding() (*zstdGMessageEncoding, error) {
writer, err := zstd.NewWriter(nil)
if err != nil {
return nil, err
}

Check warning on line 49 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L48-L49

Added lines #L48 - L49 were not covered by tests
reader, err := zstd.NewReader(nil)
if err != nil {
return nil, err
}

Check warning on line 53 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L52-L53

Added lines #L52 - L53 were not covered by tests
return &zstdGMessageEncoding{
compressor: writer,
decompressor: reader,
}, nil
}

func (c *zstdGMessageEncoding) Encode(m *gpbft.GMessage) ([]byte, error) {
cborEncoded, err := c.cborEncoding.Encode(m)
if err != nil {
return nil, err
}

Check warning on line 64 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L63-L64

Added lines #L63 - L64 were not covered by tests
compressed := c.compressor.EncodeAll(cborEncoded, make([]byte, 0, len(cborEncoded)))
return compressed, err
}

func (c *zstdGMessageEncoding) Decode(v []byte) (*gpbft.GMessage, error) {
cborEncoded, err := c.decompressor.DecodeAll(v, make([]byte, 0, len(v)))
if err != nil {
return nil, err
}

Check warning on line 73 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L72-L73

Added lines #L72 - L73 were not covered by tests
return c.cborEncoding.Decode(cborEncoded)
}
Loading

0 comments on commit ea3ad13

Please sign in to comment.