Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GPBFT message compression using zstd #793

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions f3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,31 @@ func TestF3DynamicManifest_WithPauseAndRebootstrap(t *testing.T) {
require.Equal(t, env.manifest.BootstrapEpoch-env.manifest.EC.Finality, cert0.ECChain.Base().Epoch)
}

func TestF3DynamicManifest_RebootstrapWithCompression(t *testing.T) {
env := newTestEnvironment(t).withNodes(2).withDynamicManifest().start()
env.waitForInstanceNumber(10, 30*time.Second, true)

env.manifest.Pause = true
env.updateManifest()

env.waitForNodesStopped()

env.manifest.BootstrapEpoch = 956
env.manifest.PubSub.CompressionEnabled = true
env.manifest.Pause = false
env.updateManifest()
env.waitForManifest()

env.clock.Add(1 * time.Minute)

env.waitForInstanceNumber(3, 30*time.Second, true)
env.requireEqualManifests(true)

cert0, err := env.nodes[0].f3.GetCert(env.testCtx, 0)
require.NoError(t, err)
require.Equal(t, env.manifest.BootstrapEpoch-env.manifest.EC.Finality, cert0.ECChain.Base().Epoch)
}

func TestF3LateBootstrap(t *testing.T) {
env := newTestEnvironment(t).withNodes(2).start()

Expand Down Expand Up @@ -286,6 +311,7 @@ var base = manifest.Manifest{
EC: manifest.DefaultEcConfig,
CertificateExchange: manifest.DefaultCxConfig,
CatchUpAlignment: manifest.DefaultCatchUpAlignment,
PubSub: manifest.DefaultPubSubConfig,
}

type testNode struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/klauspost/compress v1.17.11
github.com/libp2p/go-libp2p v0.37.2
github.com/libp2p/go-libp2p-pubsub v0.11.0
github.com/marcboeker/go-duckdb v1.8.2
Expand Down Expand Up @@ -67,7 +68,6 @@ require (
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down
35 changes: 22 additions & 13 deletions host.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package f3

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -51,7 +50,8 @@
msgsMutex sync.Mutex
selfMessages map[uint64]map[roundPhase][]*gpbft.GMessage

inputs gpbftInputs
inputs gpbftInputs
msgEncoding gMessageEncoding
}

type roundPhase struct {
Expand Down Expand Up @@ -132,6 +132,15 @@
return nil, fmt.Errorf("creating participant: %w", err)
}
runner.participant = p

if runner.manifest.PubSub.CompressionEnabled {
runner.msgEncoding, err = newZstdGMessageEncoding()
if err != nil {
return nil, err
}

Check warning on line 140 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L139-L140

Added lines #L139 - L140 were not covered by tests
} else {
runner.msgEncoding = &cborGMessageEncoding{}
}
return runner, nil
}

Expand Down Expand Up @@ -443,13 +452,12 @@
if h.topic == nil {
return pubsub.ErrTopicClosed
}
var bw bytes.Buffer
err = msg.MarshalCBOR(&bw)
encoded, err := h.msgEncoding.Encode(msg)
if err != nil {
return fmt.Errorf("marshalling GMessage for broadcast: %w", err)
return fmt.Errorf("encoding GMessage for broadcast: %w", err)

Check warning on line 457 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L457

Added line #L457 was not covered by tests
}

err = h.topic.Publish(ctx, bw.Bytes())
err = h.topic.Publish(ctx, encoded)
if err != nil {
return fmt.Errorf("publishing message: %w", err)
}
Expand All @@ -464,11 +472,11 @@
if h.topic == nil {
return pubsub.ErrTopicClosed
}
var bw bytes.Buffer
if err := msg.MarshalCBOR(&bw); err != nil {
return fmt.Errorf("marshalling GMessage for broadcast: %w", err)
encoded, err := h.msgEncoding.Encode(msg)
if err != nil {
return fmt.Errorf("encoding GMessage for broadcast: %w", err)

Check warning on line 477 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L477

Added line #L477 was not covered by tests
}
if err := h.topic.Publish(h.runningCtx, bw.Bytes()); err != nil {
if err := h.topic.Publish(h.runningCtx, encoded); err != nil {
return fmt.Errorf("publishing message: %w", err)
}
return nil
Expand All @@ -481,12 +489,13 @@
recordValidationTime(ctx, start, _result)
}(time.Now())

var gmsg gpbft.GMessage
if err := gmsg.UnmarshalCBOR(bytes.NewReader(msg.Data)); err != nil {
gmsg, err := h.msgEncoding.Decode(msg.Data)
if err != nil {
log.Debugw("failed to decode message", "from", msg.GetFrom(), "err", err)

Check warning on line 494 in host.go

View check run for this annotation

Codecov / codecov/patch

host.go#L494

Added line #L494 was not covered by tests
return pubsub.ValidationReject
}

switch validatedMessage, err := h.participant.ValidateMessage(&gmsg); {
switch validatedMessage, err := h.participant.ValidateMessage(gmsg); {
case errors.Is(err, gpbft.ErrValidationInvalid):
log.Debugf("validation error during validation: %+v", err)
return pubsub.ValidationReject
Expand Down
16 changes: 16 additions & 0 deletions manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
MaximumPollInterval: 4 * DefaultEcConfig.Period,
}

DefaultPubSubConfig = PubSubConfig{
CompressionEnabled: false,
}

// Default instance alignment when catching up.
DefaultCatchUpAlignment = DefaultEcConfig.Period / 2
)
Expand Down Expand Up @@ -190,6 +194,12 @@
return nil
}

type PubSubConfig struct {
CompressionEnabled bool
}

func (p *PubSubConfig) Validate() error { return nil }

// Manifest identifies the specific configuration for the F3 instance currently running.
type Manifest struct {
// Pause stops the participation in F3.
Expand Down Expand Up @@ -223,6 +233,8 @@
EC EcConfig
// Certificate Exchange specific parameters
CertificateExchange CxConfig
// PubSubConfig specifies the pubsub related configuration.
PubSub PubSubConfig
}

func (m *Manifest) Equal(o *Manifest) bool {
Expand Down Expand Up @@ -285,6 +297,9 @@
if err := m.CertificateExchange.Validate(); err != nil {
return fmt.Errorf("invalid manifest: invalid certificate exchange config: %w", err)
}
if err := m.PubSub.Validate(); err != nil {

Check warning on line 300 in manifest/manifest.go

View check run for this annotation

Codecov / codecov/patch

manifest/manifest.go#L300

Added line #L300 was not covered by tests
return fmt.Errorf("invalid manifest: invalid pubsub config: %w", err)
}

return nil
}
Expand All @@ -301,6 +316,7 @@
Gpbft: DefaultGpbftConfig,
CertificateExchange: DefaultCxConfig,
CatchUpAlignment: DefaultCatchUpAlignment,
PubSub: DefaultPubSubConfig,
}
return m
}
Expand Down
75 changes: 75 additions & 0 deletions msg_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package f3

import (
"bytes"

"github.com/filecoin-project/go-f3/gpbft"
"github.com/klauspost/compress/zstd"
)

var (
_ gMessageEncoding = (*cborGMessageEncoding)(nil)
_ gMessageEncoding = (*zstdGMessageEncoding)(nil)
)

type gMessageEncoding interface {
Encode(*gpbft.GMessage) ([]byte, error)
Decode([]byte) (*gpbft.GMessage, error)
}

type cborGMessageEncoding struct{}

func (c *cborGMessageEncoding) Encode(m *gpbft.GMessage) ([]byte, error) {
var buf bytes.Buffer
if err := m.MarshalCBOR(&buf); err != nil {
return nil, err
}

Check warning on line 26 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L25-L26

Added lines #L25 - L26 were not covered by tests
return buf.Bytes(), nil
}

func (c *cborGMessageEncoding) Decode(v []byte) (*gpbft.GMessage, error) {
r := bytes.NewReader(v)
var msg gpbft.GMessage
if err := msg.UnmarshalCBOR(r); err != nil {
return nil, err
}

Check warning on line 35 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L34-L35

Added lines #L34 - L35 were not covered by tests
return &msg, nil
}

type zstdGMessageEncoding struct {
cborEncoding cborGMessageEncoding
compressor *zstd.Encoder
decompressor *zstd.Decoder
}

func newZstdGMessageEncoding() (*zstdGMessageEncoding, error) {
writer, err := zstd.NewWriter(nil)
if err != nil {
return nil, err
}

Check warning on line 49 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L48-L49

Added lines #L48 - L49 were not covered by tests
reader, err := zstd.NewReader(nil)
if err != nil {
return nil, err
}

Check warning on line 53 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L52-L53

Added lines #L52 - L53 were not covered by tests
return &zstdGMessageEncoding{
compressor: writer,
decompressor: reader,
}, nil
}

func (c *zstdGMessageEncoding) Encode(m *gpbft.GMessage) ([]byte, error) {
cborEncoded, err := c.cborEncoding.Encode(m)
if err != nil {
return nil, err
}

Check warning on line 64 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L63-L64

Added lines #L63 - L64 were not covered by tests
compressed := c.compressor.EncodeAll(cborEncoded, make([]byte, 0, len(cborEncoded)))
return compressed, err
}

func (c *zstdGMessageEncoding) Decode(v []byte) (*gpbft.GMessage, error) {
cborEncoded, err := c.decompressor.DecodeAll(v, make([]byte, 0, len(v)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of interest - have you tried variations of the dst argument here? nil or oversizing by a % given your knowledge of the likely compression ratio?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have. My plan is to introduce pooling of buffers in a separate PR as it is an optimisation. There are a few places in encoding pipeline that we can benefit from it to reduce allocations.

In terms of impact, this probably doesn't matter as much considering the heap allocation numbers from passive testing. It is just wasteful.

if err != nil {
return nil, err
}

Check warning on line 73 in msg_encoding.go

View check run for this annotation

Codecov / codecov/patch

msg_encoding.go#L72-L73

Added lines #L72 - L73 were not covered by tests
return c.cborEncoding.Decode(cborEncoded)
}
Loading
Loading