Skip to content

Commit

Permalink
Merge pull request #12 from maxpert/compression
Browse files Browse the repository at this point in the history
Adding compression support
  • Loading branch information
maxpert authored Oct 4, 2022
2 parents 917ae7b + 38fd867 commit ec4bd88
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 11 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/doug-martin/goqu/v9 v9.18.0
github.com/fsnotify/fsnotify v1.5.4
github.com/fxamacker/cbor/v2 v2.4.0
github.com/klauspost/compress v1.15.10
github.com/mattn/go-sqlite3 v1.14.15
github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0
github.com/rs/zerolog v1.27.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/klauspost/compress v1.15.10 h1:Ai8UzuomSCDw90e1qNMtb15msBXsNpH6gzkkENQNcJo=
github.com/klauspost/compress v1.15.10/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/lib/pq v1.10.1 h1:6VXZrLU0jHBYyAqrSPa+MgPfnSvTPuMgK+k0o5kVFWo=
github.com/lib/pq v1.10.1/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
Expand Down
68 changes: 58 additions & 10 deletions logstream/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"time"

"github.com/klauspost/compress/zstd"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
)
Expand All @@ -18,14 +19,15 @@ var StreamNamePrefix = "marmot-changes"
var SubjectPrefix = "marmot-change-log"

type Replicator struct {
nodeID uint64
shards uint64
nodeID uint64
shards uint64
compressionEnabled bool

client *nats.Conn
streamMap map[uint64]nats.JetStream
}

func NewReplicator(nodeID uint64, natsServer string, shards uint64) (*Replicator, error) {
func NewReplicator(nodeID uint64, natsServer string, shards uint64, compress bool) (*Replicator, error) {
nc, err := nats.Connect(natsServer, nats.Name(nodeName(nodeID)))

if err != nil {
Expand All @@ -40,7 +42,7 @@ func NewReplicator(nodeID uint64, natsServer string, shards uint64) (*Replicator
return nil, err
}

streamCfg := makeShardConfig(shard, shards)
streamCfg := makeShardConfig(shard, shards, compress)
info, err := js.StreamInfo(streamCfg.Name)
if err == nats.ErrStreamNotFound {
log.Debug().Uint64("shard", shard).Msg("Creating stream")
Expand All @@ -51,11 +53,16 @@ func NewReplicator(nodeID uint64, natsServer string, shards uint64) (*Replicator
return nil, err
}

leader := ""
if info.Cluster != nil {
leader = info.Cluster.Leader
}

log.Debug().
Uint64("shard", shard).
Str("name", info.Config.Name).
Int("replicas", info.Config.Replicas).
Str("leader", info.Cluster.Leader).
Str("leader", leader).
Msg("Stream ready...")

if err != nil {
Expand All @@ -66,8 +73,9 @@ func NewReplicator(nodeID uint64, natsServer string, shards uint64) (*Replicator
}

return &Replicator{
client: nc,
nodeID: nodeID,
client: nc,
nodeID: nodeID,
compressionEnabled: compress,

shards: shards,
streamMap: streamMap,
Expand All @@ -81,6 +89,15 @@ func (r *Replicator) Publish(hash uint64, payload []byte) error {
log.Panic().Uint64("shard", shardID).Msg("Invalid shard")
}

if r.compressionEnabled {
compPayload, err := payloadCompress(payload)
if err != nil {
return err
}

payload = compPayload
}

ack, err := js.Publish(subjectName(shardID), payload)
if err != nil {
return err
Expand Down Expand Up @@ -115,8 +132,16 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error)
return err
}

payload := msg.Data
if r.compressionEnabled {
payload, err = payloadDecompress(msg.Data)
if err != nil {
return err
}
}

log.Debug().Str("sub", msg.Subject).Uint64("shard", shardID).Send()
err = callback(msg.Data)
err = callback(payload)
if err != nil {
if replRetry > maxReplicateRetries {
return err
Expand All @@ -138,8 +163,13 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error)
return nil
}

func makeShardConfig(shardID uint64, totalShards uint64) *nats.StreamConfig {
streamName := fmt.Sprintf("%s-%d-%d", StreamNamePrefix, totalShards, shardID)
func makeShardConfig(shardID uint64, totalShards uint64, compressed bool) *nats.StreamConfig {
compPostfix := ""
if compressed {
compPostfix = "-c"
}

streamName := fmt.Sprintf("%s%s-%d", StreamNamePrefix, compPostfix, shardID)
replicas := EntryReplicas
if replicas < 1 {
replicas = int(totalShards>>1) + 1
Expand Down Expand Up @@ -168,3 +198,21 @@ func nodeName(nodeID uint64) string {
func subjectName(shardID uint64) string {
return fmt.Sprintf("%s-%d", SubjectPrefix, shardID)
}

func payloadCompress(payload []byte) ([]byte, error) {
enc, err := zstd.NewWriter(nil)
if err != nil {
return nil, err
}

return enc.EncodeAll(payload, nil), nil
}

func payloadDecompress(payload []byte) ([]byte, error) {
dec, err := zstd.NewReader(nil)
if err != nil {
return nil, err
}

return dec.DecodeAll(payload, nil)
}
3 changes: 2 additions & 1 deletion marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func main() {
logReplicas := flag.Int("log-replicas", 1, "Number of copies to be committed for single change log")
subjectPrefix := flag.String("subject-prefix", "marmot-change-log", "Prefix for publish subjects")
streamPrefix := flag.String("stream-prefix", "marmot-changes", "Prefix for publish subjects")
enableCompress := flag.Bool("compress", false, "Enable message compression")
verbose := flag.Bool("verbose", false, "Log debug level")
flag.Parse()

Expand Down Expand Up @@ -60,7 +61,7 @@ func main() {
return
}

rep, err := logstream.NewReplicator(*nodeID, *natsAddr, *shards)
rep, err := logstream.NewReplicator(*nodeID, *natsAddr, *shards, *enableCompress)
if err != nil {
log.Panic().Err(err).Msg("Unable to connect")
}
Expand Down

0 comments on commit ec4bd88

Please sign in to comment.