From af31da2405122d8dca72cfad3f60e9030ea1dc89 Mon Sep 17 00:00:00 2001 From: Zohaib Date: Sat, 1 Oct 2022 06:46:18 -0700 Subject: [PATCH 1/4] Adding compression support --- db/change_log.go | 2 +- logstream/replicator.go | 68 +++++++++++++++++++++++++++++++++++++---- marmot.go | 5 +-- 3 files changed, 66 insertions(+), 9 deletions(-) diff --git a/db/change_log.go b/db/change_log.go index 2f6dc79..46bf36b 100644 --- a/db/change_log.go +++ b/db/change_log.go @@ -66,7 +66,7 @@ func (conn *SqliteStreamDB) Replicate(event *ChangeLogEvent) error { return nil } -func (conn *SqliteStreamDB) DeleteChangeLog(event *ChangeLogEvent) (bool, error) { +func (conn *SqliteStreamDB) DeletePublishedLog(event *ChangeLogEvent) (bool, error) { metaTableName := conn.metaTable(event.TableName, changeLogName) rs, err := conn.Delete(metaTableName). Where(goqu.Ex{ diff --git a/logstream/replicator.go b/logstream/replicator.go index 3475d8f..80df25e 100644 --- a/logstream/replicator.go +++ b/logstream/replicator.go @@ -1,9 +1,11 @@ package logstream import ( + "bytes" "fmt" "time" + "github.com/klauspost/compress/zstd" "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" ) @@ -18,14 +20,15 @@ var StreamNamePrefix = "marmot-changes" var SubjectPrefix = "marmot-change-log" type Replicator struct { - nodeID uint64 - shards uint64 + nodeID uint64 + shards uint64 + compressionEnabled bool client *nats.Conn streamMap map[uint64]nats.JetStream } -func NewReplicator(nodeID uint64, natsServer string, shards uint64) (*Replicator, error) { +func NewReplicator(nodeID uint64, natsServer string, shards uint64, compress bool) (*Replicator, error) { nc, err := nats.Connect(natsServer, nats.Name(nodeName(nodeID))) if err != nil { @@ -66,8 +69,9 @@ func NewReplicator(nodeID uint64, natsServer string, shards uint64) (*Replicator } return &Replicator{ - client: nc, - nodeID: nodeID, + client: nc, + nodeID: nodeID, + compressionEnabled: compress, shards: shards, streamMap: streamMap, @@ -81,6 +85,15 @@ func (r *Replicator) Publish(hash uint64, payload []byte) error { log.Panic().Uint64("shard", shardID).Msg("Invalid shard") } + if r.compressionEnabled { + compPayload, err := compress(payload) + if err != nil { + return err + } + + payload = compPayload + } + ack, err := js.Publish(subjectName(shardID), payload) if err != nil { return err @@ -115,8 +128,16 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error) return err } + payload := msg.Data + if r.compressionEnabled { + payload, err = decompress(msg.Data) + if err != nil { + return err + } + } + log.Debug().Str("sub", msg.Subject).Uint64("shard", shardID).Send() - err = callback(msg.Data) + err = callback(payload) if err != nil { if replRetry > maxReplicateRetries { return err @@ -168,3 +189,38 @@ func nodeName(nodeID uint64) string { func subjectName(shardID uint64) string { return fmt.Sprintf("%s-%d", SubjectPrefix, shardID) } + +func compress(payload []byte) ([]byte, error) { + buff := make([]byte, 0) + enc, err := zstd.NewWriter(bytes.NewBuffer(buff)) + if err != nil { + return nil, err + } + + _, err = enc.Write(payload) + if err != nil { + return nil, err + } + + err = enc.Flush() + if err != nil { + return nil, err + } + + return buff, nil +} + +func decompress(payload []byte) ([]byte, error) { + dec, err := zstd.NewReader(bytes.NewReader(payload)) + if err != nil { + return nil, err + } + + buff := make([]byte, 0) + _, err = dec.Read(buff) + if err != nil { + return nil, err + } + + return buff, nil +} diff --git a/marmot.go b/marmot.go index 98d3603..23d5330 100644 --- a/marmot.go +++ b/marmot.go @@ -22,6 +22,7 @@ func main() { logReplicas := flag.Int("log-replicas", 1, "Number of copies to be committed for single change log") subjectPrefix := flag.String("subject-prefix", "marmot-change-log", "Prefix for publish subjects") streamPrefix := flag.String("stream-prefix", "marmot-changes", "Prefix for publish subjects") + enableCompress := flag.Bool("compress", false, "Enable message compression") verbose := flag.Bool("verbose", false, "Log debug level") flag.Parse() @@ -60,7 +61,7 @@ func main() { return } - rep, err := logstream.NewReplicator(*nodeID, *natsAddr, *shards) + rep, err := logstream.NewReplicator(*nodeID, *natsAddr, *shards, *enableCompress) if err != nil { log.Panic().Err(err).Msg("Unable to connect") } @@ -115,7 +116,7 @@ func onChangeEvent(streamDB *db.SqliteStreamDB) func(data []byte) error { return err } - _, _ = streamDB.DeleteChangeLog(ev.Payload) + _, _ = streamDB.DeletePublishedLog(ev.Payload) return streamDB.Replicate(ev.Payload) } } From 389324d88881f85e350f29544dbb87d9d2cd7b18 Mon Sep 17 00:00:00 2001 From: Zohaib Date: Sat, 1 Oct 2022 07:12:03 -0700 Subject: [PATCH 2/4] Tidy up go.mod --- go.mod | 1 + go.sum | 1 + 2 files changed, 2 insertions(+) diff --git a/go.mod b/go.mod index 5a867c9..a8094ed 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/doug-martin/goqu/v9 v9.18.0 github.com/fsnotify/fsnotify v1.5.4 github.com/fxamacker/cbor/v2 v2.4.0 + github.com/klauspost/compress v1.15.10 github.com/mattn/go-sqlite3 v1.14.15 github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0 github.com/rs/zerolog v1.27.0 diff --git a/go.sum b/go.sum index a97a4b4..40151b7 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,7 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/klauspost/compress v1.15.10 h1:Ai8UzuomSCDw90e1qNMtb15msBXsNpH6gzkkENQNcJo= +github.com/klauspost/compress v1.15.10/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/lib/pq v1.10.1 h1:6VXZrLU0jHBYyAqrSPa+MgPfnSvTPuMgK+k0o5kVFWo= github.com/lib/pq v1.10.1/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= From c9ccf0e9f442ae189c37395d9f6470e6feb0f013 Mon Sep 17 00:00:00 2001 From: Zohaib Date: Sat, 1 Oct 2022 07:54:23 -0700 Subject: [PATCH 3/4] Simplified zstd compress/decompress --- logstream/replicator.go | 41 ++++++++++++++--------------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/logstream/replicator.go b/logstream/replicator.go index 80df25e..2c3cdf4 100644 --- a/logstream/replicator.go +++ b/logstream/replicator.go @@ -1,7 +1,6 @@ package logstream import ( - "bytes" "fmt" "time" @@ -54,11 +53,16 @@ func NewReplicator(nodeID uint64, natsServer string, shards uint64, compress boo return nil, err } + leader := "" + if info.Cluster != nil { + leader = info.Cluster.Leader + } + log.Debug(). Uint64("shard", shard). Str("name", info.Config.Name). Int("replicas", info.Config.Replicas). - Str("leader", info.Cluster.Leader). + Str("leader", leader). Msg("Stream ready...") if err != nil { @@ -86,7 +90,7 @@ func (r *Replicator) Publish(hash uint64, payload []byte) error { } if r.compressionEnabled { - compPayload, err := compress(payload) + compPayload, err := payloadCompress(payload) if err != nil { return err } @@ -130,7 +134,7 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error) payload := msg.Data if r.compressionEnabled { - payload, err = decompress(msg.Data) + payload, err = payloadDecompress(msg.Data) if err != nil { return err } @@ -190,37 +194,20 @@ func subjectName(shardID uint64) string { return fmt.Sprintf("%s-%d", SubjectPrefix, shardID) } -func compress(payload []byte) ([]byte, error) { - buff := make([]byte, 0) - enc, err := zstd.NewWriter(bytes.NewBuffer(buff)) - if err != nil { - return nil, err - } - - _, err = enc.Write(payload) - if err != nil { - return nil, err - } - - err = enc.Flush() +func payloadCompress(payload []byte) ([]byte, error) { + enc, err := zstd.NewWriter(nil) if err != nil { return nil, err } - return buff, nil + return enc.EncodeAll(payload, nil), nil } -func decompress(payload []byte) ([]byte, error) { - dec, err := zstd.NewReader(bytes.NewReader(payload)) - if err != nil { - return nil, err - } - - buff := make([]byte, 0) - _, err = dec.Read(buff) +func payloadDecompress(payload []byte) ([]byte, error) { + dec, err := zstd.NewReader(nil) if err != nil { return nil, err } - return buff, nil + return dec.DecodeAll(payload, nil) } From ddb080444cace4c1d0a084d5cf4707cd318ee279 Mon Sep 17 00:00:00 2001 From: Zohaib Date: Sat, 1 Oct 2022 08:02:19 -0700 Subject: [PATCH 4/4] Contextualizing shard config --- logstream/replicator.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/logstream/replicator.go b/logstream/replicator.go index 2c3cdf4..75c8a8b 100644 --- a/logstream/replicator.go +++ b/logstream/replicator.go @@ -42,7 +42,7 @@ func NewReplicator(nodeID uint64, natsServer string, shards uint64, compress boo return nil, err } - streamCfg := makeShardConfig(shard, shards) + streamCfg := makeShardConfig(shard, shards, compress) info, err := js.StreamInfo(streamCfg.Name) if err == nats.ErrStreamNotFound { log.Debug().Uint64("shard", shard).Msg("Creating stream") @@ -163,8 +163,13 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error) return nil } -func makeShardConfig(shardID uint64, totalShards uint64) *nats.StreamConfig { - streamName := fmt.Sprintf("%s-%d-%d", StreamNamePrefix, totalShards, shardID) +func makeShardConfig(shardID uint64, totalShards uint64, compressed bool) *nats.StreamConfig { + compPostfix := "" + if compressed { + compPostfix = "-c" + } + + streamName := fmt.Sprintf("%s%s-%d", StreamNamePrefix, compPostfix, shardID) replicas := EntryReplicas if replicas < 1 { replicas = int(totalShards>>1) + 1