From afd382f43903ab49d2b1207860c339f4c5aace24 Mon Sep 17 00:00:00 2001 From: Kinshuk Bairagi Date: Tue, 6 Sep 2022 12:15:47 +0530 Subject: [PATCH 1/4] Bump rocksdb to 7.5.3 --- Dockerfile | 6 +- go.mod | 2 +- go.sum | 18 ++--- internal/storage/rocksdb/metrics.go | 6 +- internal/storage/rocksdb/store.go | 104 +++++++++++++------------ internal/storage/rocksdb/store_test.go | 50 ++++++------ internal/storage/rocksdb/wb_iter.go | 50 ++++++------ 7 files changed, 121 insertions(+), 115 deletions(-) diff --git a/Dockerfile b/Dockerfile index aeacd027..92a7cc1a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,9 +15,9 @@ RUN apt-get update && \ RUN curl -fsSL https://github.com/facebook/zstd/releases/download/v1.4.4/zstd-1.4.4.tar.gz | tar xz \ && cd zstd-1.4.4 && make install -# Install RocksDB v6.5.3 -RUN curl -fsSL https://github.com/facebook/rocksdb/archive/v6.22.1.tar.gz | tar xz \ - && cd rocksdb-6.22.1 && make install +# Install RocksDB +RUN curl -fsSL https://github.com/facebook/rocksdb/archive/v7.5.3.tar.gz | tar xz \ + && cd rocksdb-7.5.3 && make install # Install GoLang RUN curl -fsSL https://dl.google.com/go/go1.18.1.linux-$(dpkg --print-architecture).tar.gz | tar xz \ diff --git a/go.mod b/go.mod index fee4a087..411515a6 100644 --- a/go.mod +++ b/go.mod @@ -10,13 +10,13 @@ require ( github.com/dgraph-io/badger/v3 v3.2103.1 github.com/dgraph-io/ristretto v0.1.0 github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect - github.com/flipkart-incubator/gorocksdb v0.0.0-20210920082714-1f7dcbb7b2e4 github.com/flipkart-incubator/nexus v0.0.0-20220725092354-3772bb325062 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 github.com/gorilla/mux v1.8.0 github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 github.com/kpango/fastime v1.0.16 + github.com/linxGnu/grocksdb v1.7.7 github.com/matttproud/golang_protobuf_extensions v1.0.1 github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 diff --git a/go.sum b/go.sum index d8449dc2..a3302714 100644 --- a/go.sum +++ b/go.sum @@ -81,17 +81,9 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go. github.com/envoyproxy/go-control-plane v0.10.1/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E9/baC+qXE/TeeyBRzgJDws= -github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c h1:8ISkoahWXwZR41ois5lSJBSVw4D0OV19Ht/JSTzvSv0= -github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64= -github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A= -github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg= -github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpmbhCOZJ293Lz68O7PYrF2EzeiFMwCLk= -github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flipkart-incubator/gorocksdb v0.0.0-20210920082714-1f7dcbb7b2e4 h1:9zPLm5QKcBp5xUOBMWhRZPU+iwfl6xJtbSmbazmgy2g= -github.com/flipkart-incubator/gorocksdb v0.0.0-20210920082714-1f7dcbb7b2e4/go.mod h1:kvJSXc90Ifw0rxuTxEHKq6UH/7hQ/gd9RKCyD94ctJ0= github.com/flipkart-incubator/nexus v0.0.0-20220725092354-3772bb325062 h1:n5cwpgmvZk+YOTBGIgTaK1OQeFVdM550ZDdKOd778hw= github.com/flipkart-incubator/nexus v0.0.0-20220725092354-3772bb325062/go.mod h1:i5sm5XII1dOXLLjrwLmsHpZ5oHfDI328uCvtm6fhyAg= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -232,6 +224,8 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/linxGnu/grocksdb v1.7.7 h1:b6o8gagb4FL+P55qUzPchBR/C0u1lWjJOWQSWbhvTWg= +github.com/linxGnu/grocksdb v1.7.7/go.mod h1:0hTf+iA+GOr0jDX4CgIYyJZxqOH9XlBh6KVj8+zmF34= github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls= @@ -335,13 +329,16 @@ github.com/spf13/viper v1.10.1 h1:nuJZuYpG7gTj/XqiUwg8bA0cp1+M2mC3J4g5luUYBKk= github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8qy1rU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= @@ -439,5 +436,6 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/storage/rocksdb/metrics.go b/internal/storage/rocksdb/metrics.go index 23e4b540..7f9e527e 100644 --- a/internal/storage/rocksdb/metrics.go +++ b/internal/storage/rocksdb/metrics.go @@ -1,7 +1,7 @@ package rocksdb import ( - "github.com/flipkart-incubator/gorocksdb" + "github.com/linxGnu/grocksdb" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -11,7 +11,7 @@ type rocksDBCollector struct { memTableUnflushedGauge *prometheus.Desc memTableReadersTotalGauge *prometheus.Desc cacheTotalGauge *prometheus.Desc - db *gorocksdb.DB + db *grocksdb.DB lgr *zap.Logger } @@ -51,7 +51,7 @@ func (collector *rocksDBCollector) Describe(ch chan<- *prometheus.Desc) { //Collect implements required collect function for all promehteus collectors func (collector *rocksDBCollector) Collect(ch chan<- prometheus.Metric) { - memoryUsage, err := gorocksdb.GetApproximateMemoryUsageByType([]*gorocksdb.DB{collector.db}, nil) + memoryUsage, err := grocksdb.GetApproximateMemoryUsageByType([]*grocksdb.DB{collector.db}, nil) if err != nil { collector.lgr.Error("Failed to get rocksgb memory usage", zap.Error(err)) } else { diff --git a/internal/storage/rocksdb/store.go b/internal/storage/rocksdb/store.go index e3588e48..8bbd49e4 100644 --- a/internal/storage/rocksdb/store.go +++ b/internal/storage/rocksdb/store.go @@ -22,7 +22,7 @@ import ( "github.com/flipkart-incubator/dkv/internal/stats" "github.com/flipkart-incubator/dkv/internal/storage" "github.com/flipkart-incubator/dkv/pkg/serverpb" - "github.com/flipkart-incubator/gorocksdb" + "github.com/linxGnu/grocksdb" "go.uber.org/zap" "gopkg.in/ini.v1" ) @@ -37,10 +37,10 @@ type DB interface { } type rocksDB struct { - db *gorocksdb.DB - normalCF *gorocksdb.ColumnFamilyHandle - ttlCF *gorocksdb.ColumnFamilyHandle - optimTrxnDB *gorocksdb.OptimisticTransactionDB + db *grocksdb.DB + normalCF *grocksdb.ColumnFamilyHandle + ttlCF *grocksdb.ColumnFamilyHandle + optimTrxnDB *grocksdb.OptimisticTransactionDB opts *rocksDBOpts stat *storage.Stat @@ -50,11 +50,11 @@ type rocksDB struct { } type rocksDBOpts struct { - readOpts *gorocksdb.ReadOptions - writeOpts *gorocksdb.WriteOptions - blockTableOpts *gorocksdb.BlockBasedTableOptions - rocksDBOpts *gorocksdb.Options - restoreOpts *gorocksdb.RestoreOptions + readOpts *grocksdb.ReadOptions + writeOpts *grocksdb.WriteOptions + blockTableOpts *grocksdb.BlockBasedTableOptions + rocksDBOpts *grocksdb.Options + restoreOpts *grocksdb.RestoreOptions folderName string sstDirectory string lgr *zap.Logger @@ -120,7 +120,7 @@ func WithSSTDir(sstDir string) DBOption { func WithCacheSize(size uint64) DBOption { return func(opts *rocksDBOpts) { if size > 0 { - opts.blockTableOpts.SetBlockCache(gorocksdb.NewLRUCache(size)) + opts.blockTableOpts.SetBlockCache(grocksdb.NewLRUCache(size)) } else { opts.blockTableOpts.SetNoBlockCache(true) } @@ -141,7 +141,7 @@ func WithRocksDBConfig(iniFile string) DBOption { for key, val := range sectConf { fmt.Fprintf(&buff, "%s=%s;", key, val) } - if rdbOpts, err := gorocksdb.GetOptionsFromString(opts.rocksDBOpts, buff.String()); err != nil { + if rdbOpts, err := grocksdb.GetOptionsFromString(opts.rocksDBOpts, buff.String()); err != nil { panic(fmt.Errorf("unable to parge RocksDB configuration from given file: %s, error: %v", iniFile, err)) } else { opts.rocksDBOpts = rdbOpts @@ -184,16 +184,24 @@ func (m *ttlCompactionFilter) Filter(level int, key, val []byte) (remove bool, n return false, nil } +func (m *ttlCompactionFilter) SetIgnoreSnapshots(value bool) { + +} + +func (m *ttlCompactionFilter) Destroy() { + +} + func newOptions(dbFolder string) *rocksDBOpts { - bbto := gorocksdb.NewDefaultBlockBasedTableOptions() - opts := gorocksdb.NewDefaultOptions() + bbto := grocksdb.NewDefaultBlockBasedTableOptions() + opts := grocksdb.NewDefaultOptions() opts.SetCreateIfMissing(true) opts.SetCreateIfMissingColumnFamilies(true) opts.SetWALTtlSeconds(uint64(600)) opts.SetBlockBasedTableFactory(bbto) - rstOpts := gorocksdb.NewRestoreOptions() - wrOpts := gorocksdb.NewDefaultWriteOptions() - rdOpts := gorocksdb.NewDefaultReadOptions() + rstOpts := grocksdb.NewRestoreOptions() + wrOpts := grocksdb.NewDefaultWriteOptions() + rdOpts := grocksdb.NewDefaultReadOptions() cfNames := []string{"default", "ttl"} return &rocksDBOpts{ folderName: dbFolder, @@ -219,19 +227,19 @@ func (rdbOpts *rocksDBOpts) destroy() { func openStore(opts *rocksDBOpts) (*rocksDB, error) { normalOpts := opts.rocksDBOpts - ttlOpts, err := gorocksdb.GetOptionsFromString(normalOpts, "") + ttlOpts, err := grocksdb.GetOptionsFromString(normalOpts, "") if err != nil { return nil, err } ttlOpts.SetCompactionFilter(&ttlCompactionFilter{opts.lgr}) - optimTrxnDB, cfh, err := gorocksdb.OpenOptimisticTransactionDbColumnFamilies(opts.rocksDBOpts, - opts.folderName, opts.cfNames, []*gorocksdb.Options{normalOpts, ttlOpts}) + optimTrxnDB, cfh, err := grocksdb.OpenOptimisticTransactionDbColumnFamilies(opts.rocksDBOpts, + opts.folderName, opts.cfNames, []*grocksdb.Options{normalOpts, ttlOpts}) if err != nil { return nil, err } rocksdb := rocksDB{ - db: optimTrxnDB.GetBaseDb(), + db: optimTrxnDB.GetBaseDB(), normalCF: cfh[0], ttlCF: cfh[1], optimTrxnDB: optimTrxnDB, @@ -259,7 +267,7 @@ func (rdb *rocksDB) Compaction() error { case <-tick: // trigger a compaction rdb.opts.lgr.Info("Triggering RocksDB Compaction") - rdb.db.CompactRangeCF(rdb.ttlCF, gorocksdb.Range{nil, nil}) + rdb.db.CompactRangeCF(rdb.ttlCF, grocksdb.Range{nil, nil}) } } return nil @@ -314,7 +322,7 @@ func (rdb *rocksDB) Put(pairs ...*serverpb.KVPair) error { defer rdb.opts.statsCli.Timing(metricsPrefix+".latency.ms", time.Now()) defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(metricsLabel), time.Now()) - wb := gorocksdb.NewWriteBatch() + wb := grocksdb.NewWriteBatch() defer wb.Destroy() for _, kv := range pairs { if kv == nil { @@ -348,7 +356,7 @@ func (rdb *rocksDB) Delete(key []byte) error { defer rdb.opts.statsCli.Timing("rocksdb.delete.latency.ms", time.Now()) defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.Delete), time.Now()) - wb := gorocksdb.NewWriteBatch() + wb := grocksdb.NewWriteBatch() defer wb.Destroy() wb.DeleteCF(rdb.ttlCF, key) wb.Delete(key) @@ -376,7 +384,7 @@ func (rdb *rocksDB) CompareAndSet(key, expect, update []byte) (bool, error) { ro := rdb.opts.readOpts wo := rdb.opts.writeOpts - to := gorocksdb.NewDefaultOptimisticTransactionOptions() + to := grocksdb.NewDefaultOptimisticTransactionOptions() txn := rdb.optimTrxnDB.TransactionBegin(wo, to, nil) defer txn.Destroy() @@ -416,11 +424,11 @@ const ( snapshotLogSizeForFlush = 0 ) -func (rdb *rocksDB) generateSST(snap *gorocksdb.Snapshot, cf *gorocksdb.ColumnFamilyHandle, sstDir string) (*os.File, error) { +func (rdb *rocksDB) generateSST(snap *grocksdb.Snapshot, cf *grocksdb.ColumnFamilyHandle, sstDir string) (*os.File, error) { var fileName string - envOpts := gorocksdb.NewDefaultEnvOptions() - opts := gorocksdb.NewDefaultOptions() - sstWrtr := gorocksdb.NewSSTFileWriter(envOpts, opts) + envOpts := grocksdb.NewDefaultEnvOptions() + opts := grocksdb.NewDefaultOptions() + sstWrtr := grocksdb.NewSSTFileWriter(envOpts, opts) defer sstWrtr.Destroy() if fileName = sstDir + sstDefaultCF; cf == rdb.ttlCF { @@ -433,7 +441,7 @@ func (rdb *rocksDB) generateSST(snap *gorocksdb.Snapshot, cf *gorocksdb.ColumnFa } // TODO: Any options need to be set - readOpts := gorocksdb.NewDefaultReadOptions() + readOpts := grocksdb.NewDefaultReadOptions() defer readOpts.Destroy() readOpts.SetSnapshot(snap) @@ -586,7 +594,7 @@ func (rdb *rocksDB) BackupTo(folder string) error { // Retain only the latest backup in the given folder defer be.PurgeOldBackups(1) - return be.CreateNewBackupFlush(rdb.db, true) + return be.CreateNewBackupFlush(true) } const tempDirPrefix = "rocksdb-restore-" @@ -675,7 +683,7 @@ func (rdb *rocksDB) SaveChanges(changes []*serverpb.ChangeRecord) (uint64, error appldChngNum := uint64(0) for _, chng := range changes { - wb := gorocksdb.WriteBatchFrom(chng.SerialisedForm) + wb := grocksdb.WriteBatchFrom(chng.SerialisedForm) defer wb.Destroy() err := rdb.db.Write(rdb.opts.writeOpts, wb) if err != nil { @@ -689,11 +697,11 @@ func (rdb *rocksDB) SaveChanges(changes []*serverpb.ChangeRecord) (uint64, error type iter struct { iterOpts storage.IterationOptions - rdbIter *gorocksdb.Iterator + rdbIter *grocksdb.Iterator ttlCF bool } -func (rdb *rocksDB) newIterCF(readOpts *gorocksdb.ReadOptions, iterOpts storage.IterationOptions, cf *gorocksdb.ColumnFamilyHandle) *iter { +func (rdb *rocksDB) newIterCF(readOpts *grocksdb.ReadOptions, iterOpts storage.IterationOptions, cf *grocksdb.ColumnFamilyHandle) *iter { it := rdb.db.NewIteratorCF(readOpts, cf) if sk, present := iterOpts.StartKey(); present { it.Seek(sk) @@ -769,7 +777,7 @@ func (rdb *rocksDB) Iterate(iterOpts storage.IterationOptions) storage.Iterator return iterators.Concat(baseIter, ttlIter) } -func (rdb *rocksDB) toChangeRecord(writeBatch *gorocksdb.WriteBatch, changeNum uint64) *serverpb.ChangeRecord { +func (rdb *rocksDB) toChangeRecord(writeBatch *grocksdb.WriteBatch, changeNum uint64) *serverpb.ChangeRecord { chngRec := &serverpb.ChangeRecord{} chngRec.ChangeNumber = changeNum dataBts := writeBatch.Data() @@ -786,21 +794,21 @@ func (rdb *rocksDB) toChangeRecord(writeBatch *gorocksdb.WriteBatch, changeNum u return chngRec } -func (rdb *rocksDB) openBackupEngine(folder string) (*gorocksdb.BackupEngine, error) { +func (rdb *rocksDB) openBackupEngine(folder string) (*grocksdb.BackupEngine, error) { opts := rdb.opts.rocksDBOpts - return gorocksdb.OpenBackupEngine(opts, folder) + return grocksdb.OpenBackupEngine(opts, folder) } -func (rdb *rocksDB) toTrxnRecord(wbr *gorocksdb.WriteBatchRecord) *serverpb.TrxnRecord { +func (rdb *rocksDB) toTrxnRecord(wbr *grocksdb.WriteBatchRecord) *serverpb.TrxnRecord { trxnRec := &serverpb.TrxnRecord{} switch wbr.Type { - case gorocksdb.WriteBatchCFDeletionRecord: + case grocksdb.WriteBatchCFDeletionRecord: trxnRec.Type = serverpb.TrxnRecord_Delete - case gorocksdb.WriteBatchDeletionRecord: + case grocksdb.WriteBatchDeletionRecord: trxnRec.Type = serverpb.TrxnRecord_Delete - case gorocksdb.WriteBatchValueRecord: + case grocksdb.WriteBatchValueRecord: trxnRec.Type = serverpb.TrxnRecord_Put - case gorocksdb.WriteBatchCFValueRecord: + case grocksdb.WriteBatchCFValueRecord: trxnRec.Type = serverpb.TrxnRecord_Put default: trxnRec.Type = serverpb.TrxnRecord_Unknown @@ -824,7 +832,7 @@ func byteArrayCopy(src []byte, dstLen int) []byte { return dst } -func toByteArray(value *gorocksdb.Slice) []byte { +func toByteArray(value *grocksdb.Slice) []byte { src := value.Data() res := byteArrayCopy(src, value.Size()) return res @@ -839,11 +847,11 @@ func parseTTLMsgPackData(valueWithTTL []byte) (*ttlDataFormat, error) { return &row, err } -func (rdb *rocksDB) getSingleKey(ro *gorocksdb.ReadOptions, key []byte) ([]*serverpb.KVPair, error) { +func (rdb *rocksDB) getSingleKey(ro *grocksdb.ReadOptions, key []byte) ([]*serverpb.KVPair, error) { defer rdb.opts.statsCli.Timing("rocksdb.single.get.latency.ms", time.Now()) defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.Get), time.Now()) - values, err := rdb.db.MultiGetCFMultiCF(ro, []*gorocksdb.ColumnFamilyHandle{rdb.normalCF, rdb.ttlCF}, [][]byte{key, key}) + values, err := rdb.db.MultiGetCFMultiCF(ro, []*grocksdb.ColumnFamilyHandle{rdb.normalCF, rdb.ttlCF}, [][]byte{key, key}) if err != nil { rdb.opts.statsCli.Incr("rocksdb.single.get.errors", 1) rdb.stat.ResponseError.WithLabelValues(stats.Get).Inc() @@ -859,7 +867,7 @@ func (rdb *rocksDB) getSingleKey(ro *gorocksdb.ReadOptions, key []byte) ([]*serv return nil, nil } -func (rdb *rocksDB) extractResult(value1 *gorocksdb.Slice, value2 *gorocksdb.Slice, key []byte) *serverpb.KVPair { +func (rdb *rocksDB) extractResult(value1 *grocksdb.Slice, value2 *grocksdb.Slice, key []byte) *serverpb.KVPair { if value1.Size() > 0 { //non ttl use-case val := toByteArray(value1) @@ -887,12 +895,12 @@ func (rdb *rocksDB) extractResult(value1 *gorocksdb.Slice, value2 *gorocksdb.Sli return nil } -func (rdb *rocksDB) getMultipleKeys(ro *gorocksdb.ReadOptions, keys [][]byte) ([]*serverpb.KVPair, error) { +func (rdb *rocksDB) getMultipleKeys(ro *grocksdb.ReadOptions, keys [][]byte) ([]*serverpb.KVPair, error) { defer rdb.opts.statsCli.Timing("rocksdb.multi.get.latency.ms", time.Now()) defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.MultiGet), time.Now()) kl := len(keys) - reqCFs := make([]*gorocksdb.ColumnFamilyHandle, kl<<1) + reqCFs := make([]*grocksdb.ColumnFamilyHandle, kl<<1) for i := 0; i < kl; i++ { reqCFs[i] = rdb.normalCF reqCFs[i+kl] = rdb.ttlCF diff --git a/internal/storage/rocksdb/store_test.go b/internal/storage/rocksdb/store_test.go index 6113a575..69fbe654 100644 --- a/internal/storage/rocksdb/store_test.go +++ b/internal/storage/rocksdb/store_test.go @@ -20,7 +20,7 @@ import ( "github.com/flipkart-incubator/dkv/internal/storage" "github.com/flipkart-incubator/dkv/pkg/serverpb" - "github.com/flipkart-incubator/gorocksdb" + "github.com/linxGnu/grocksdb" ) const ( @@ -172,7 +172,7 @@ func TestCompactionFilterOnExpiredKeys(t *testing.T) { } } - store.db.CompactRangeCF(store.ttlCF, gorocksdb.Range{nil, nil}) + store.db.CompactRangeCF(store.ttlCF, grocksdb.Range{nil, nil}) for i := 1; i <= numKeys; i++ { key := fmt.Sprintf("%s_%d", keyPref, i) if value, err := store.db.GetCF(store.opts.readOpts, store.ttlCF, []byte(key)); err != nil { @@ -344,7 +344,7 @@ func TestSaveChanges(t *testing.T) { wbPutKeyPrefix, wbPutValPrefix := "ddKey", "ddVal" chngs := make([]*serverpb.ChangeRecord, numTrxns) for i := 0; i < numTrxns; i++ { - wb := gorocksdb.NewWriteBatch() + wb := grocksdb.NewWriteBatch() defer wb.Destroy() ks, vs := fmt.Sprintf("%s_%d", wbPutKeyPrefix, i+1), fmt.Sprintf("%s_%d", wbPutValPrefix, i+1) wb.Put([]byte(ks), []byte(vs)) @@ -511,10 +511,10 @@ func TestGetUpdatesFromSeqNumForBatches(t *testing.T) { expNumTrxns := expNumBatchTrxns * numTrxnsPerBatch for i := 1; i <= expNumBatchTrxns; i++ { k, v := fmt.Sprintf("bKey_%d", i), fmt.Sprintf("bVal_%d", i) - wb := gorocksdb.NewWriteBatch() + wb := grocksdb.NewWriteBatch() wb.Put([]byte(k), []byte(v)) wb.Delete([]byte(k)) - wo := gorocksdb.NewDefaultWriteOptions() + wo := grocksdb.NewDefaultWriteOptions() wo.SetSync(true) if err := store.db.Write(wo, wb); err != nil { t.Fatal(err) @@ -705,7 +705,7 @@ func TestIterationOnExplicitSnapshot(t *testing.T) { keyPrefix2, valPrefix2 := "secKey", "secVal" putKeys(t, numTrxns, keyPrefix2, valPrefix2, 0) - readOpts := gorocksdb.NewDefaultReadOptions() + readOpts := grocksdb.NewDefaultReadOptions() defer readOpts.Destroy() readOpts.SetSnapshot(snap) @@ -885,11 +885,11 @@ func TestAtomicIncrDecr(t *testing.T) { func TestLoadChangesForOptimisticTransactions(t *testing.T) { name := fmt.Sprintf("%s-TestChngsOptimTrans", store.opts.folderName) opts := store.opts.rocksDBOpts - ro := gorocksdb.NewDefaultReadOptions() - wo := gorocksdb.NewDefaultWriteOptions() - to := gorocksdb.NewDefaultOptimisticTransactionOptions() + ro := grocksdb.NewDefaultReadOptions() + wo := grocksdb.NewDefaultWriteOptions() + to := grocksdb.NewDefaultOptimisticTransactionOptions() - tdb, err := gorocksdb.OpenOptimisticTransactionDb(opts, name) + tdb, err := grocksdb.OpenOptimisticTransactionDb(opts, name) if err != nil { t.Errorf("Unable to open optimistic transaction DB. Error: %v", err) } @@ -977,12 +977,12 @@ func TestLoadChangesForOptimisticTransactions(t *testing.T) { func TestPessimisticTransactions(t *testing.T) { name := fmt.Sprintf("%s-TestPessTrans", store.opts.folderName) opts := store.opts.rocksDBOpts - ro := gorocksdb.NewDefaultReadOptions() - wo := gorocksdb.NewDefaultWriteOptions() - tdbo := gorocksdb.NewDefaultTransactionDBOptions() - to := gorocksdb.NewDefaultTransactionOptions() + ro := grocksdb.NewDefaultReadOptions() + wo := grocksdb.NewDefaultWriteOptions() + tdbo := grocksdb.NewDefaultTransactionDBOptions() + to := grocksdb.NewDefaultTransactionOptions() - tdb, err := gorocksdb.OpenTransactionDb(opts, tdbo, name) + tdb, err := grocksdb.OpenTransactionDb(opts, tdbo, name) if err != nil { t.Errorf("Unable to open transaction DB. Error: %v", err) } @@ -1059,10 +1059,10 @@ func BenchmarkMergeOperators(b *testing.B) { name := fmt.Sprintf("%s-BenchMergeOpers", store.opts.folderName) opts := store.opts.rocksDBOpts opts.SetMergeOperator(&IncOp{}) - ro := gorocksdb.NewDefaultReadOptions() - wo := gorocksdb.NewDefaultWriteOptions() + ro := grocksdb.NewDefaultReadOptions() + wo := grocksdb.NewDefaultWriteOptions() - db, err := gorocksdb.OpenDb(opts, name) + db, err := grocksdb.OpenDb(opts, name) if err != nil { b.Errorf("Unable to open DB. Error: %v", err) } @@ -1079,7 +1079,7 @@ func BenchmarkMergeOperators(b *testing.B) { if err != nil { b.Errorf("Unable to merge. Error: %v", err) } - db.CompactRange(gorocksdb.Range{nil, nil}) + db.CompactRange(grocksdb.Range{nil, nil}) } cnt, err := db.Get(ro, ctrKey) defer cnt.Free() @@ -1124,12 +1124,12 @@ func BenchmarkCompareAndSet(b *testing.B) { func BenchmarkPessimisticTransactions(b *testing.B) { name := fmt.Sprintf("%s-BenchPessTrans", store.opts.folderName) opts := store.opts.rocksDBOpts - ro := gorocksdb.NewDefaultReadOptions() - wo := gorocksdb.NewDefaultWriteOptions() - tdbo := gorocksdb.NewDefaultTransactionDBOptions() - to := gorocksdb.NewDefaultTransactionOptions() + ro := grocksdb.NewDefaultReadOptions() + wo := grocksdb.NewDefaultWriteOptions() + tdbo := grocksdb.NewDefaultTransactionDBOptions() + to := grocksdb.NewDefaultTransactionOptions() - tdb, err := gorocksdb.OpenTransactionDb(opts, tdbo, name) + tdb, err := grocksdb.OpenTransactionDb(opts, tdbo, name) if err != nil { b.Errorf("Unable to open transaction DB. Error: %v", err) } @@ -1218,7 +1218,7 @@ func BenchmarkIteration(b *testing.B) { snap := store.db.NewSnapshot() defer store.db.ReleaseSnapshot(snap) - readOpts := gorocksdb.NewDefaultReadOptions() + readOpts := grocksdb.NewDefaultReadOptions() defer readOpts.Destroy() readOpts.SetSnapshot(snap) diff --git a/internal/storage/rocksdb/wb_iter.go b/internal/storage/rocksdb/wb_iter.go index 377fcf2a..f0a7bc75 100644 --- a/internal/storage/rocksdb/wb_iter.go +++ b/internal/storage/rocksdb/wb_iter.go @@ -2,7 +2,7 @@ package rocksdb import ( "errors" - "github.com/flipkart-incubator/gorocksdb" + "github.com/linxGnu/grocksdb" "io" ) @@ -18,7 +18,7 @@ func NewWriteBatchIterator(wbData []byte) *WriteBatchIterator { // WriteBatchIterator represents a iterator to iterator over records. type WriteBatchIterator struct { data []byte - record gorocksdb.WriteBatchRecord + record grocksdb.WriteBatchRecord err error } @@ -38,30 +38,30 @@ func (iter *WriteBatchIterator) Next() bool { switch iter.record.Type { case - gorocksdb.WriteBatchDeletionRecord, - gorocksdb.WriteBatchSingleDeletionRecord: + grocksdb.WriteBatchDeletionRecord, + grocksdb.WriteBatchSingleDeletionRecord: iter.record.Key = iter.decodeSlice() case - gorocksdb.WriteBatchCFDeletionRecord, - gorocksdb.WriteBatchCFSingleDeletionRecord: + grocksdb.WriteBatchCFDeletionRecord, + grocksdb.WriteBatchCFSingleDeletionRecord: iter.record.CF = int(iter.decodeVarint()) if iter.err == nil { iter.record.Key = iter.decodeSlice() } case - gorocksdb.WriteBatchValueRecord, - gorocksdb.WriteBatchMergeRecord, - gorocksdb.WriteBatchRangeDeletion, - gorocksdb.WriteBatchBlobIndex: + grocksdb.WriteBatchValueRecord, + grocksdb.WriteBatchMergeRecord, + grocksdb.WriteBatchRangeDeletion, + grocksdb.WriteBatchBlobIndex: iter.record.Key = iter.decodeSlice() if iter.err == nil { iter.record.Value = iter.decodeSlice() } case - gorocksdb.WriteBatchCFValueRecord, - gorocksdb.WriteBatchCFRangeDeletion, - gorocksdb.WriteBatchCFMergeRecord, - gorocksdb.WriteBatchCFBlobIndex: + grocksdb.WriteBatchCFValueRecord, + grocksdb.WriteBatchCFRangeDeletion, + grocksdb.WriteBatchCFMergeRecord, + grocksdb.WriteBatchCFBlobIndex: iter.record.CF = int(iter.decodeVarint()) if iter.err == nil { iter.record.Key = iter.decodeSlice() @@ -69,16 +69,16 @@ func (iter *WriteBatchIterator) Next() bool { if iter.err == nil { iter.record.Value = iter.decodeSlice() } - case gorocksdb.WriteBatchLogDataRecord: + case grocksdb.WriteBatchLogDataRecord: iter.record.Value = iter.decodeSlice() case - gorocksdb.WriteBatchNoopRecord, - gorocksdb.WriteBatchBeginPrepareXIDRecord, - gorocksdb.WriteBatchBeginPersistedPrepareXIDRecord: + grocksdb.WriteBatchNoopRecord, + grocksdb.WriteBatchBeginPrepareXIDRecord, + grocksdb.WriteBatchBeginPersistedPrepareXIDRecord: case - gorocksdb.WriteBatchEndPrepareXIDRecord, - gorocksdb.WriteBatchCommitXIDRecord, - gorocksdb.WriteBatchRollbackXIDRecord: + grocksdb.WriteBatchEndPrepareXIDRecord, + grocksdb.WriteBatchCommitXIDRecord, + grocksdb.WriteBatchRollbackXIDRecord: iter.record.Value = iter.decodeSlice() default: iter.err = errors.New("unsupported wal record type") @@ -89,7 +89,7 @@ func (iter *WriteBatchIterator) Next() bool { } // Record returns the current record. -func (iter *WriteBatchIterator) Record() *gorocksdb.WriteBatchRecord { +func (iter *WriteBatchIterator) Record() *grocksdb.WriteBatchRecord { return &iter.record } @@ -111,14 +111,14 @@ func (iter *WriteBatchIterator) decodeSlice() []byte { return ret } -func (iter *WriteBatchIterator) decodeRecType() gorocksdb.WriteBatchRecordType { +func (iter *WriteBatchIterator) decodeRecType() grocksdb.WriteBatchRecordType { if len(iter.data) == 0 { iter.err = io.ErrShortBuffer - return gorocksdb.WriteBatchNotUsedRecord + return grocksdb.WriteBatchNotUsedRecord } t := iter.data[0] iter.data = iter.data[1:] - return gorocksdb.WriteBatchRecordType(t) + return grocksdb.WriteBatchRecordType(t) } func (iter *WriteBatchIterator) decodeVarint() uint64 { From 35355030d3449372f5fa9c8ef8c3681b68084a50 Mon Sep 17 00:00:00 2001 From: Kinshuk Bairagi Date: Tue, 6 Sep 2022 13:34:29 +0530 Subject: [PATCH 2/4] Update store_test.go --- internal/storage/rocksdb/store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/storage/rocksdb/store_test.go b/internal/storage/rocksdb/store_test.go index 69fbe654..e898b8cc 100644 --- a/internal/storage/rocksdb/store_test.go +++ b/internal/storage/rocksdb/store_test.go @@ -896,7 +896,7 @@ func TestLoadChangesForOptimisticTransactions(t *testing.T) { defer tdb.Close() ctrKey := []byte("num") - bdb := tdb.GetBaseDb() + bdb := tdb.GetBaseDB() err = bdb.Put(wo, ctrKey, []byte{0}) if err != nil { t.Errorf("Unable to PUT using base DB of optimistic transaction. Error: %v", err) From d91a741efef6b9323dfd52d241704130a74c2710 Mon Sep 17 00:00:00 2001 From: Kinshuk Bairagi Date: Tue, 6 Sep 2022 14:42:24 +0530 Subject: [PATCH 3/4] Update store.go --- internal/storage/rocksdb/store.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/storage/rocksdb/store.go b/internal/storage/rocksdb/store.go index 8bbd49e4..03439e52 100644 --- a/internal/storage/rocksdb/store.go +++ b/internal/storage/rocksdb/store.go @@ -795,8 +795,7 @@ func (rdb *rocksDB) toChangeRecord(writeBatch *grocksdb.WriteBatch, changeNum ui } func (rdb *rocksDB) openBackupEngine(folder string) (*grocksdb.BackupEngine, error) { - opts := rdb.opts.rocksDBOpts - return grocksdb.OpenBackupEngine(opts, folder) + return grocksdb.CreateBackupEngine(rdb.db, folder) } func (rdb *rocksDB) toTrxnRecord(wbr *grocksdb.WriteBatchRecord) *serverpb.TrxnRecord { From 6c6b5bacf6cbd7001d2e4d071de23f296bd9e0d5 Mon Sep 17 00:00:00 2001 From: Kinshuk Bairagi Date: Wed, 7 Sep 2022 14:08:00 +0530 Subject: [PATCH 4/4] fix backup npe --- go.mod | 2 +- go.sum | 4 ++-- internal/storage/rocksdb/store.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 411515a6..0e5f7e74 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 github.com/kpango/fastime v1.0.16 - github.com/linxGnu/grocksdb v1.7.7 + github.com/linxGnu/grocksdb v1.7.8-0.20220907063002-2aa505c75b25 github.com/matttproud/golang_protobuf_extensions v1.0.1 github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 diff --git a/go.sum b/go.sum index a3302714..57e462e5 100644 --- a/go.sum +++ b/go.sum @@ -224,8 +224,8 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/linxGnu/grocksdb v1.7.7 h1:b6o8gagb4FL+P55qUzPchBR/C0u1lWjJOWQSWbhvTWg= -github.com/linxGnu/grocksdb v1.7.7/go.mod h1:0hTf+iA+GOr0jDX4CgIYyJZxqOH9XlBh6KVj8+zmF34= +github.com/linxGnu/grocksdb v1.7.8-0.20220907063002-2aa505c75b25 h1:3+Su9AzsfHQZb1h0nVuiPuG7Tgb0LW/mGyvMHAMZMK0= +github.com/linxGnu/grocksdb v1.7.8-0.20220907063002-2aa505c75b25/go.mod h1:0hTf+iA+GOr0jDX4CgIYyJZxqOH9XlBh6KVj8+zmF34= github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls= diff --git a/internal/storage/rocksdb/store.go b/internal/storage/rocksdb/store.go index 03439e52..eac0e15d 100644 --- a/internal/storage/rocksdb/store.go +++ b/internal/storage/rocksdb/store.go @@ -795,7 +795,7 @@ func (rdb *rocksDB) toChangeRecord(writeBatch *grocksdb.WriteBatch, changeNum ui } func (rdb *rocksDB) openBackupEngine(folder string) (*grocksdb.BackupEngine, error) { - return grocksdb.CreateBackupEngine(rdb.db, folder) + return grocksdb.CreateBackupEngineWithPath(rdb.db, folder) } func (rdb *rocksDB) toTrxnRecord(wbr *grocksdb.WriteBatchRecord) *serverpb.TrxnRecord {