diff --git a/.travis.yml b/.travis.yml index 79a7bb6c..9b331467 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,8 @@ +dist: xenial language: go go: - - 1.11 + - 1.12.x + - 1.13.x - tip before_install: diff --git a/backup.go b/backup.go index a6673ff8..87621dd9 100644 --- a/backup.go +++ b/backup.go @@ -89,7 +89,7 @@ func OpenBackupEngine(opts *Options, path string) (*BackupEngine, error) { be := C.rocksdb_backup_engine_open(opts.c, cpath, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &BackupEngine{ @@ -104,19 +104,25 @@ func (b *BackupEngine) UnsafeGetBackupEngine() unsafe.Pointer { return unsafe.Pointer(b.c) } -// CreateNewBackup takes a new backup from db. -func (b *BackupEngine) CreateNewBackup(db *DB) error { +// CreateNewBackupFlush takes a new backup from db. If flush is set to true, +// it flushes the WAL before taking the backup. +func (b *BackupEngine) CreateNewBackupFlush(db *DB, flush bool) error { var cErr *C.char - C.rocksdb_backup_engine_create_new_backup(b.c, db.c, &cErr) + C.rocksdb_backup_engine_create_new_backup_flush(b.c, db.c, boolToChar(flush), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil } +// CreateNewBackup takes a new backup from db. +func (b *BackupEngine) CreateNewBackup(db *DB) error { + return b.CreateNewBackupFlush(db, false) +} + // GetInfo gets an object that gives information about // the backups that have already been taken func (b *BackupEngine) GetInfo() *BackupEngineInfo { @@ -138,7 +144,18 @@ func (b *BackupEngine) RestoreDBFromLatestBackup(dbDir, walDir string, ro *Resto C.rocksdb_backup_engine_restore_db_from_latest_backup(b.c, cDbDir, cWalDir, ro.c, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + +// PurgeOldBackups deletes all backups older than the latest 'n' backups +func (b *BackupEngine) PurgeOldBackups(n uint32) error { + var cErr *C.char + C.rocksdb_backup_engine_purge_old_backups(b.c, C.uint32_t(n), &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/cache.go b/cache.go index ed708d7f..866326dc 100644 --- a/cache.go +++ b/cache.go @@ -9,7 +9,7 @@ type Cache struct { } // NewLRUCache creates a new LRU Cache object with the capacity given. -func NewLRUCache(capacity int) *Cache { +func NewLRUCache(capacity uint64) *Cache { return NewNativeCache(C.rocksdb_cache_create_lru(C.size_t(capacity))) } @@ -19,13 +19,13 @@ func NewNativeCache(c *C.rocksdb_cache_t) *Cache { } // GetUsage returns the Cache memory usage. -func (c *Cache) GetUsage() int { - return int(C.rocksdb_cache_get_usage(c.c)) +func (c *Cache) GetUsage() uint64 { + return uint64(C.rocksdb_cache_get_usage(c.c)) } // GetPinnedUsage returns the Cache pinned memory usage. -func (c *Cache) GetPinnedUsage() int { - return int(C.rocksdb_cache_get_pinned_usage(c.c)) +func (c *Cache) GetPinnedUsage() uint64 { + return uint64(C.rocksdb_cache_get_pinned_usage(c.c)) } // Destroy deallocates the Cache object. diff --git a/checkpoint.go b/checkpoint.go index a7d2bf40..4a6436d2 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -43,7 +43,7 @@ func (checkpoint *Checkpoint) CreateCheckpoint(checkpoint_dir string, log_size_f C.rocksdb_checkpoint_create(checkpoint.c, cDir, C.uint64_t(log_size_for_flush), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/checkpoint_test.go b/checkpoint_test.go index 9505740d..1ea10fdb 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -1,10 +1,11 @@ package gorocksdb import ( - "github.com/facebookgo/ensure" "io/ioutil" "os" "testing" + + "github.com/facebookgo/ensure" ) func TestCheckpoint(t *testing.T) { diff --git a/db.go b/db.go old mode 100644 new mode 100755 index fdfc8186..d67fbff3 --- a/db.go +++ b/db.go @@ -32,7 +32,26 @@ func OpenDb(opts *Options, name string) (*DB, error) { defer C.free(unsafe.Pointer(cName)) db := C.rocksdb_open(opts.c, cName, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return &DB{ + name: name, + c: db, + opts: opts, + }, nil +} + +// OpenDbWithTTL opens a database with TTL support with the specified options. +func OpenDbWithTTL(opts *Options, name string, ttl int) (*DB, error) { + var ( + cErr *C.char + cName = C.CString(name) + ) + defer C.free(unsafe.Pointer(cName)) + db := C.rocksdb_open_with_ttl(opts.c, cName, C.int(ttl), &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &DB{ @@ -51,7 +70,7 @@ func OpenDbForReadOnly(opts *Options, name string, errorIfLogFileExist bool) (*D defer C.free(unsafe.Pointer(cName)) db := C.rocksdb_open_for_read_only(opts.c, cName, boolToChar(errorIfLogFileExist), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &DB{ @@ -104,7 +123,7 @@ func OpenDbColumnFamilies( &cErr, ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, nil, errors.New(C.GoString(cErr)) } @@ -166,7 +185,7 @@ func OpenDbForReadOnlyColumnFamilies( &cErr, ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, nil, errors.New(C.GoString(cErr)) } @@ -192,12 +211,15 @@ func ListColumnFamilies(opts *Options, name string) ([]string, error) { defer C.free(unsafe.Pointer(cName)) cNames := C.rocksdb_list_column_families(opts.c, cName, &cLen, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } namesLen := int(cLen) names := make([]string, namesLen) - cNamesArr := (*[1 << 30]*C.char)(unsafe.Pointer(cNames))[:namesLen:namesLen] + // The maximum capacity of the following two slices is limited to (2^29)-1 to remain compatible + // with 32-bit platforms. The size of a `*C.char` (a pointer) is 4 Byte on a 32-bit system + // and (2^29)*4 == math.MaxInt32 + 1. -- See issue golang/go#13656 + cNamesArr := (*[(1 << 29) - 1]*C.char)(unsafe.Pointer(cNames))[:namesLen:namesLen] for i, n := range cNamesArr { names[i] = C.GoString(n) } @@ -224,7 +246,7 @@ func (db *DB) Get(opts *ReadOptions, key []byte) (*Slice, error) { ) cValue := C.rocksdb_get(db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewSlice(cValue, cValLen), nil @@ -239,13 +261,13 @@ func (db *DB) GetBytes(opts *ReadOptions, key []byte) ([]byte, error) { ) cValue := C.rocksdb_get(db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } if cValue == nil { return nil, nil } - defer C.free(unsafe.Pointer(cValue)) + defer C.rocksdb_free(unsafe.Pointer(cValue)) return C.GoBytes(unsafe.Pointer(cValue), C.int(cValLen)), nil } @@ -258,12 +280,26 @@ func (db *DB) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Sli ) cValue := C.rocksdb_get_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewSlice(cValue, cValLen), nil } +// GetPinned returns the data associated with the key from the database. +func (db *DB) GetPinned(opts *ReadOptions, key []byte) (*PinnableSliceHandle, error) { + var ( + cErr *C.char + cKey = byteToChar(key) + ) + cHandle := C.rocksdb_get_pinned(db.c, opts.c, cKey, C.size_t(len(key)), &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewNativePinnableSliceHandle(cHandle), nil +} + // MultiGet returns the data associated with the passed keys from the database func (db *DB) MultiGet(opts *ReadOptions, keys ...[]byte) (Slices, error) { cKeys, cKeySizes := byteSlicesToCSlices(keys) @@ -287,7 +323,7 @@ func (db *DB) MultiGet(opts *ReadOptions, keys ...[]byte) (Slices, error) { for i, rocksErr := range rocksErrs { if rocksErr != nil { - defer C.free(unsafe.Pointer(rocksErr)) + defer C.rocksdb_free(unsafe.Pointer(rocksErr)) err := fmt.Errorf("getting %q failed: %v", string(keys[i]), C.GoString(rocksErr)) errs = append(errs, err) } @@ -339,7 +375,7 @@ func (db *DB) MultiGetCFMultiCF(opts *ReadOptions, cfs ColumnFamilyHandles, keys for i, rocksErr := range rocksErrs { if rocksErr != nil { - defer C.free(unsafe.Pointer(rocksErr)) + defer C.rocksdb_free(unsafe.Pointer(rocksErr)) err := fmt.Errorf("getting %q failed: %v", string(keys[i]), C.GoString(rocksErr)) errs = append(errs, err) } @@ -366,7 +402,7 @@ func (db *DB) Put(opts *WriteOptions, key, value []byte) error { ) C.rocksdb_put(db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -381,7 +417,7 @@ func (db *DB) PutCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, value []byt ) C.rocksdb_put_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -395,7 +431,7 @@ func (db *DB) Delete(opts *WriteOptions, key []byte) error { ) C.rocksdb_delete(db.c, opts.c, cKey, C.size_t(len(key)), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -409,7 +445,7 @@ func (db *DB) DeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte) e ) C.rocksdb_delete_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -424,7 +460,7 @@ func (db *DB) Merge(opts *WriteOptions, key []byte, value []byte) error { ) C.rocksdb_merge(db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -440,7 +476,7 @@ func (db *DB) MergeCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte, va ) C.rocksdb_merge_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -451,7 +487,7 @@ func (db *DB) Write(opts *WriteOptions, batch *WriteBatch) error { var cErr *C.char C.rocksdb_write(db.c, opts.c, batch.c, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -471,6 +507,20 @@ func (db *DB) NewIteratorCF(opts *ReadOptions, cf *ColumnFamilyHandle) *Iterator return NewNativeIterator(unsafe.Pointer(cIter)) } +func (db *DB) GetUpdatesSince(seqNumber uint64) (*WalIterator, error) { + var cErr *C.char + cIter := C.rocksdb_get_updates_since(db.c, C.uint64_t(seqNumber), nil, &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewNativeWalIterator(unsafe.Pointer(cIter)), nil +} + +func (db *DB) GetLatestSequenceNumber() uint64 { + return uint64(C.rocksdb_get_latest_sequence_number(db.c)) +} + // NewSnapshot creates a new snapshot of the database. func (db *DB) NewSnapshot() *Snapshot { cSnap := C.rocksdb_create_snapshot(db.c) @@ -488,7 +538,7 @@ func (db *DB) GetProperty(propName string) string { cprop := C.CString(propName) defer C.free(unsafe.Pointer(cprop)) cValue := C.rocksdb_property_value(db.c, cprop) - defer C.free(unsafe.Pointer(cValue)) + defer C.rocksdb_free(unsafe.Pointer(cValue)) return C.GoString(cValue) } @@ -497,7 +547,7 @@ func (db *DB) GetPropertyCF(propName string, cf *ColumnFamilyHandle) string { cProp := C.CString(propName) defer C.free(unsafe.Pointer(cProp)) cValue := C.rocksdb_property_value_cf(db.c, cf.c, cProp) - defer C.free(unsafe.Pointer(cValue)) + defer C.rocksdb_free(unsafe.Pointer(cValue)) return C.GoString(cValue) } @@ -510,7 +560,7 @@ func (db *DB) CreateColumnFamily(opts *Options, name string) (*ColumnFamilyHandl defer C.free(unsafe.Pointer(cName)) cHandle := C.rocksdb_create_column_family(db.c, opts.c, cName, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewNativeColumnFamilyHandle(cHandle), nil @@ -521,7 +571,7 @@ func (db *DB) DropColumnFamily(c *ColumnFamilyHandle) error { var cErr *C.char C.rocksdb_drop_column_family(db.c, c.c, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -542,13 +592,21 @@ func (db *DB) GetApproximateSizes(ranges []Range) []uint64 { cLimits := make([]*C.char, len(ranges)) cStartLens := make([]C.size_t, len(ranges)) cLimitLens := make([]C.size_t, len(ranges)) + cErrors := make([]*C.char, len(ranges)) for i, r := range ranges { - cStarts[i] = byteToChar(r.Start) + cStarts[i] = (*C.char)(C.CBytes(r.Start)) cStartLens[i] = C.size_t(len(r.Start)) - cLimits[i] = byteToChar(r.Limit) + cLimits[i] = (*C.char)(C.CBytes(r.Limit)) cLimitLens[i] = C.size_t(len(r.Limit)) } + defer func() { + for i := range ranges { + C.free(unsafe.Pointer(cStarts[i])) + C.free(unsafe.Pointer(cLimits[i])) + } + }() + C.rocksdb_approximate_sizes( db.c, C.int(len(ranges)), @@ -556,7 +614,8 @@ func (db *DB) GetApproximateSizes(ranges []Range) []uint64 { &cStartLens[0], &cLimits[0], &cLimitLens[0], - (*C.uint64_t)(&sizes[0])) + (*C.uint64_t)(&sizes[0]), + &cErrors[0]) return sizes } @@ -576,13 +635,21 @@ func (db *DB) GetApproximateSizesCF(cf *ColumnFamilyHandle, ranges []Range) []ui cLimits := make([]*C.char, len(ranges)) cStartLens := make([]C.size_t, len(ranges)) cLimitLens := make([]C.size_t, len(ranges)) + cErrors := make([]*C.char, len(ranges)) for i, r := range ranges { - cStarts[i] = byteToChar(r.Start) + cStarts[i] = (*C.char)(C.CBytes(r.Start)) cStartLens[i] = C.size_t(len(r.Start)) - cLimits[i] = byteToChar(r.Limit) + cLimits[i] = (*C.char)(C.CBytes(r.Limit)) cLimitLens[i] = C.size_t(len(r.Limit)) } + defer func() { + for i := range ranges { + C.free(unsafe.Pointer(cStarts[i])) + C.free(unsafe.Pointer(cLimits[i])) + } + }() + C.rocksdb_approximate_sizes_cf( db.c, cf.c, @@ -591,11 +658,43 @@ func (db *DB) GetApproximateSizesCF(cf *ColumnFamilyHandle, ranges []Range) []ui &cStartLens[0], &cLimits[0], &cLimitLens[0], - (*C.uint64_t)(&sizes[0])) + (*C.uint64_t)(&sizes[0]), + &cErrors[0]) return sizes } +// SetOptions dynamically changes options through the SetOptions API. +func (db *DB) SetOptions(keys, values []string) error { + num_keys := len(keys) + + if num_keys == 0 { + return nil + } + + cKeys := make([]*C.char, num_keys) + cValues := make([]*C.char, num_keys) + for i := range keys { + cKeys[i] = C.CString(keys[i]) + cValues[i] = C.CString(values[i]) + } + + var cErr *C.char + + C.rocksdb_set_options( + db.c, + C.int(num_keys), + &cKeys[0], + &cValues[0], + &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + // LiveFileMetadata is a metadata which is associated with each SST file. type LiveFileMetadata struct { Name string @@ -651,7 +750,7 @@ func (db *DB) Flush(opts *FlushOptions) error { var cErr *C.char C.rocksdb_flush(db.c, opts.c, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -662,7 +761,7 @@ func (db *DB) DisableFileDeletions() error { var cErr *C.char C.rocksdb_disable_file_deletions(db.c, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -673,7 +772,7 @@ func (db *DB) EnableFileDeletions(force bool) error { var cErr *C.char C.rocksdb_enable_file_deletions(db.c, boolToChar(force), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -688,6 +787,50 @@ func (db *DB) DeleteFile(name string) { C.rocksdb_delete_file(db.c, cName) } +// DeleteFileInRange deletes SST files that contain keys between the Range, [r.Start, r.Limit] +func (db *DB) DeleteFileInRange(r Range) error { + cStartKey := byteToChar(r.Start) + cLimitKey := byteToChar(r.Limit) + + var cErr *C.char + + C.rocksdb_delete_file_in_range( + db.c, + cStartKey, C.size_t(len(r.Start)), + cLimitKey, C.size_t(len(r.Limit)), + &cErr, + ) + + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + +// DeleteFileInRangeCF deletes SST files that contain keys between the Range, [r.Start, r.Limit], and +// belong to a given column family +func (db *DB) DeleteFileInRangeCF(cf *ColumnFamilyHandle, r Range) error { + cStartKey := byteToChar(r.Start) + cLimitKey := byteToChar(r.Limit) + + var cErr *C.char + + C.rocksdb_delete_file_in_range_cf( + db.c, + cf.c, + cStartKey, C.size_t(len(r.Start)), + cLimitKey, C.size_t(len(r.Limit)), + &cErr, + ) + + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + // IngestExternalFile loads a list of external SST files. func (db *DB) IngestExternalFile(filePaths []string, opts *IngestExternalFileOptions) error { cFilePaths := make([]*C.char, len(filePaths)) @@ -711,7 +854,7 @@ func (db *DB) IngestExternalFile(filePaths []string, opts *IngestExternalFileOpt ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -741,7 +884,7 @@ func (db *DB) IngestExternalFileCF(handle *ColumnFamilyHandle, filePaths []strin ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -756,7 +899,7 @@ func (db *DB) NewCheckpoint() (*Checkpoint, error) { db.c, &cErr, ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } @@ -778,7 +921,7 @@ func DestroyDb(name string, opts *Options) error { defer C.free(unsafe.Pointer(cName)) C.rocksdb_destroy_db(opts.c, cName, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -793,7 +936,7 @@ func RepairDb(name string, opts *Options) error { defer C.free(unsafe.Pointer(cName)) C.rocksdb_repair_db(opts.c, cName, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/db_test.go b/db_test.go old mode 100644 new mode 100755 index c689b35e..4ccc7aa8 --- a/db_test.go +++ b/db_test.go @@ -19,8 +19,8 @@ func TestDBCRUD(t *testing.T) { var ( givenKey = []byte("hello") - givenVal1 = []byte("world1") - givenVal2 = []byte("world2") + givenVal1 = []byte("") + givenVal2 = []byte("world1") wo = NewDefaultWriteOptions() ro = NewDefaultReadOptions() ) @@ -41,11 +41,23 @@ func TestDBCRUD(t *testing.T) { ensure.Nil(t, err) ensure.DeepEqual(t, v2.Data(), givenVal2) + // retrieve pinned + v3, err := db.GetPinned(ro, givenKey) + defer v3.Destroy() + ensure.Nil(t, err) + ensure.DeepEqual(t, v3.Data(), givenVal2) + // delete ensure.Nil(t, db.Delete(wo, givenKey)) - v3, err := db.Get(ro, givenKey) + v4, err := db.Get(ro, givenKey) + ensure.Nil(t, err) + ensure.True(t, v4.Data() == nil) + + // retrieve missing pinned + v5, err := db.GetPinned(ro, givenKey) + defer v5.Destroy() ensure.Nil(t, err) - ensure.True(t, v3.Data() == nil) + ensure.True(t, v5.Data() == nil) } func TestDBCRUDDBPaths(t *testing.T) { @@ -62,12 +74,20 @@ func TestDBCRUDDBPaths(t *testing.T) { var ( givenKey = []byte("hello") - givenVal1 = []byte("world1") - givenVal2 = []byte("world2") + givenVal1 = []byte("") + givenVal2 = []byte("world1") + givenVal3 = []byte("world2") wo = NewDefaultWriteOptions() ro = NewDefaultReadOptions() ) + // retrieve before create + noexist, err := db.Get(ro, givenKey) + defer noexist.Free() + ensure.Nil(t, err) + ensure.False(t, noexist.Exists()) + ensure.DeepEqual(t, noexist.Data(), []byte(nil)) + // create ensure.Nil(t, db.Put(wo, givenKey, givenVal1)) @@ -75,6 +95,7 @@ func TestDBCRUDDBPaths(t *testing.T) { v1, err := db.Get(ro, givenKey) defer v1.Free() ensure.Nil(t, err) + ensure.True(t, v1.Exists()) ensure.DeepEqual(t, v1.Data(), givenVal1) // update @@ -82,13 +103,24 @@ func TestDBCRUDDBPaths(t *testing.T) { v2, err := db.Get(ro, givenKey) defer v2.Free() ensure.Nil(t, err) + ensure.True(t, v2.Exists()) ensure.DeepEqual(t, v2.Data(), givenVal2) + // update + ensure.Nil(t, db.Put(wo, givenKey, givenVal3)) + v3, err := db.Get(ro, givenKey) + defer v3.Free() + ensure.Nil(t, err) + ensure.True(t, v3.Exists()) + ensure.DeepEqual(t, v3.Data(), givenVal3) + // delete ensure.Nil(t, db.Delete(wo, givenKey)) - v3, err := db.Get(ro, givenKey) + v4, err := db.Get(ro, givenKey) + defer v4.Free() ensure.Nil(t, err) - ensure.True(t, v3.Data() == nil) + ensure.False(t, v4.Exists()) + ensure.DeepEqual(t, v4.Data(), []byte(nil)) } func newTestDB(t *testing.T, name string, applyOpts func(opts *Options)) *DB { @@ -172,3 +204,42 @@ func TestDBMultiGet(t *testing.T) { ensure.DeepEqual(t, values[2].Data(), givenVal2) ensure.DeepEqual(t, values[3].Data(), givenVal3) } + +func TestDBGetApproximateSizes(t *testing.T) { + db := newTestDB(t, "TestDBGetApproximateSizes", nil) + defer db.Close() + + // no ranges + sizes := db.GetApproximateSizes(nil) + ensure.DeepEqual(t, len(sizes), 0) + + // range will nil start and limit + sizes = db.GetApproximateSizes([]Range{{Start: nil, Limit: nil}}) + ensure.DeepEqual(t, sizes, []uint64{0}) + + // valid range + sizes = db.GetApproximateSizes([]Range{{Start: []byte{0x00}, Limit: []byte{0xFF}}}) + ensure.DeepEqual(t, sizes, []uint64{0}) +} + +func TestDBGetApproximateSizesCF(t *testing.T) { + db := newTestDB(t, "TestDBGetApproximateSizesCF", nil) + defer db.Close() + + o := NewDefaultOptions() + + cf, err := db.CreateColumnFamily(o, "other") + ensure.Nil(t, err) + + // no ranges + sizes := db.GetApproximateSizesCF(cf, nil) + ensure.DeepEqual(t, len(sizes), 0) + + // range will nil start and limit + sizes = db.GetApproximateSizesCF(cf, []Range{{Start: nil, Limit: nil}}) + ensure.DeepEqual(t, sizes, []uint64{0}) + + // valid range + sizes = db.GetApproximateSizesCF(cf, []Range{{Start: []byte{0x00}, Limit: []byte{0xFF}}}) + ensure.DeepEqual(t, sizes, []uint64{0}) +} diff --git a/dynflag.go b/dynflag.go index 81909317..18c18f40 100644 --- a/dynflag.go +++ b/dynflag.go @@ -2,5 +2,5 @@ package gorocksdb -// #cgo LDFLAGS: -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy +// #cgo LDFLAGS: -lrocksdb -lstdc++ -lm -ldl import "C" diff --git a/env.go b/env.go index 386335bc..11e84ef8 100644 --- a/env.go +++ b/env.go @@ -13,6 +13,11 @@ func NewDefaultEnv() *Env { return NewNativeEnv(C.rocksdb_create_default_env()) } +// NewMemEnv creates MemEnv for in-memory testing. +func NewMemEnv() *Env { + return NewNativeEnv(C.rocksdb_create_mem_env()) +} + // NewNativeEnv creates a Environment object. func NewNativeEnv(c *C.rocksdb_env_t) *Env { return &Env{c} diff --git a/filter_policy.go b/filter_policy.go index ac57fd99..a9c222b0 100644 --- a/filter_policy.go +++ b/filter_policy.go @@ -49,6 +49,12 @@ func NewBloomFilter(bitsPerKey int) FilterPolicy { return NewNativeFilterPolicy(C.rocksdb_filterpolicy_create_bloom(C.int(bitsPerKey))) } +// NewBloomFilterFull returns a new filter policy created with use_block_based_builder=false +// (use full or partitioned filter). +func NewBloomFilterFull(bitsPerKey int) FilterPolicy { + return NewNativeFilterPolicy(C.rocksdb_filterpolicy_create_bloom_full(C.int(bitsPerKey))) +} + // Hold references to filter policies. var filterPolicies = NewCOWList() diff --git a/gorocksdb.c b/gorocksdb.c index c8258376..efebbe51 100644 --- a/gorocksdb.c +++ b/gorocksdb.c @@ -37,7 +37,9 @@ rocksdb_filterpolicy_t* gorocksdb_filterpolicy_create(uintptr_t idx) { (const char *(*)(void*))(gorocksdb_filterpolicy_name)); } -void gorocksdb_filterpolicy_delete_filter(void* state, const char* v, size_t s) { } +void gorocksdb_filterpolicy_delete_filter(void* state, const char* v, size_t s) { + free((char*)v); +} /* Merge Operator */ @@ -51,7 +53,9 @@ rocksdb_mergeoperator_t* gorocksdb_mergeoperator_create(uintptr_t idx) { (const char* (*)(void*))(gorocksdb_mergeoperator_name)); } -void gorocksdb_mergeoperator_delete_value(void* id, const char* v, size_t s) { } +void gorocksdb_mergeoperator_delete_value(void* id, const char* v, size_t s) { + free((char*)v); +} /* Slice Transform */ diff --git a/iterator.go b/iterator.go index 4a280e2c..fefb82f1 100644 --- a/iterator.go +++ b/iterator.go @@ -113,7 +113,7 @@ func (iter *Iterator) Err() error { var cErr *C.char C.rocksdb_iter_get_error(iter.c, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/memory_usage.go b/memory_usage.go index 740b877d..7b9a6ad6 100644 --- a/memory_usage.go +++ b/memory_usage.go @@ -42,7 +42,7 @@ func GetApproximateMemoryUsageByType(dbs []*DB, caches []*Cache) (*MemoryUsage, var cErr *C.char memoryUsage := C.rocksdb_approximate_memory_usage_create(consumers, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } @@ -55,4 +55,4 @@ func GetApproximateMemoryUsageByType(dbs []*DB, caches []*Cache) (*MemoryUsage, CacheTotal: uint64(C.rocksdb_approximate_memory_usage_get_cache_total(memoryUsage)), } return result, nil -} \ No newline at end of file +} diff --git a/merge_operator.go b/merge_operator.go index 33f83948..2de7f9ab 100644 --- a/merge_operator.go +++ b/merge_operator.go @@ -28,6 +28,14 @@ type MergeOperator interface { // internal corruption. This will be treated as an error by the library. FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) + // The name of the MergeOperator. + Name() string +} + +// PartialMerger implements PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, err) +// When a MergeOperator implements this interface, PartialMerge will be called in addition +// to FullMerge for compactions across levels +type PartialMerger interface { // This function performs merge(left_op, right_op) // when both the operands are themselves merge operation types // that you would have passed to a db.Merge() call in the same order @@ -42,9 +50,28 @@ type MergeOperator interface { // The library will internally keep track of the operations, and apply them in the // correct order once a base-value (a Put/Delete/End-of-Database) is seen. PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool) +} - // The name of the MergeOperator. - Name() string +// MultiMerger implements PartialMergeMulti(key []byte, operands [][]byte) ([]byte, err) +// When a MergeOperator implements this interface, PartialMergeMulti will be called in addition +// to FullMerge for compactions across levels +type MultiMerger interface { + // PartialMerge performs merge on multiple operands + // when all of the operands are themselves merge operation types + // that you would have passed to a db.Merge() call in the same order + // (i.e.: db.Merge(key,operand[0]), followed by db.Merge(key,operand[1]), + // ... db.Merge(key, operand[n])). + // + // PartialMerge should combine them into a single merge operation. + // The return value should be constructed such that a call to + // db.Merge(key, new_value) would yield the same result as a call + // to db.Merge(key,operand[0]), followed by db.Merge(key,operand[1]), + // ... db.Merge(key, operand[n])). + // + // If it is impossible or infeasible to combine the operations, return false. + // The library will internally keep track of the operations, and apply them in the + // correct order once a base-value (a Put/Delete/End-of-Database) is seen. + PartialMergeMulti(key []byte, operands [][]byte) ([]byte, bool) } // NewNativeMergeOperator creates a MergeOperator object. @@ -110,13 +137,22 @@ func gorocksdb_mergeoperator_partial_merge_multi(idx int, cKey *C.char, cKeyLen success := true merger := mergeOperators.Get(idx).(mergeOperatorWrapper).mergeOperator - leftOperand := operands[0] - for i := 1; i < int(cNumOperands); i++ { - newValue, success = merger.PartialMerge(key, leftOperand, operands[i]) - if !success { - break + + // check if this MergeOperator supports partial or multi merges + switch v := merger.(type) { + case MultiMerger: + newValue, success = v.PartialMergeMulti(key, operands) + case PartialMerger: + leftOperand := operands[0] + for i := 1; i < int(cNumOperands); i++ { + newValue, success = v.PartialMerge(key, leftOperand, operands[i]) + if !success { + break + } + leftOperand = newValue } - leftOperand = newValue + default: + success = false } newValueLen := len(newValue) diff --git a/merge_operator_test.go b/merge_operator_test.go index fd7e0887..9dad6f78 100644 --- a/merge_operator_test.go +++ b/merge_operator_test.go @@ -40,15 +40,146 @@ func TestMergeOperator(t *testing.T) { ensure.DeepEqual(t, v1.Data(), givenMerged) } +func TestPartialMergeOperator(t *testing.T) { + var ( + givenKey = []byte("hello") + startingVal = []byte("foo") + mergeVal1 = []byte("bar") + mergeVal2 = []byte("baz") + fMergeResult = []byte("foobarbaz") + pMergeResult = []byte("barbaz") + ) + + merger := &mockMergePartialOperator{ + fullMerge: func(key, existingValue []byte, operands [][]byte) ([]byte, bool) { + ensure.DeepEqual(&fatalAsError{t}, key, givenKey) + ensure.DeepEqual(&fatalAsError{t}, existingValue, startingVal) + ensure.DeepEqual(&fatalAsError{t}, operands[0], pMergeResult) + return fMergeResult, true + }, + partialMerge: func(key, leftOperand, rightOperand []byte) ([]byte, bool) { + ensure.DeepEqual(&fatalAsError{t}, key, givenKey) + ensure.DeepEqual(&fatalAsError{t}, leftOperand, mergeVal1) + ensure.DeepEqual(&fatalAsError{t}, rightOperand, mergeVal2) + return pMergeResult, true + }, + } + db := newTestDB(t, "TestMergeOperator", func(opts *Options) { + opts.SetMergeOperator(merger) + }) + defer db.Close() + + wo := NewDefaultWriteOptions() + defer wo.Destroy() + + // insert a starting value and compact to trigger merges + ensure.Nil(t, db.Put(wo, givenKey, startingVal)) + + // trigger a compaction to ensure that a merge is performed + db.CompactRange(Range{nil, nil}) + + // we expect these two operands to be passed to merge partial + ensure.Nil(t, db.Merge(wo, givenKey, mergeVal1)) + ensure.Nil(t, db.Merge(wo, givenKey, mergeVal2)) + + // trigger a compaction to ensure that a + // partial and full merge are performed + db.CompactRange(Range{nil, nil}) + + ro := NewDefaultReadOptions() + v1, err := db.Get(ro, givenKey) + defer v1.Free() + ensure.Nil(t, err) + ensure.DeepEqual(t, v1.Data(), fMergeResult) + +} + +func TestMergeMultiOperator(t *testing.T) { + var ( + givenKey = []byte("hello") + startingVal = []byte("foo") + mergeVal1 = []byte("bar") + mergeVal2 = []byte("baz") + fMergeResult = []byte("foobarbaz") + pMergeResult = []byte("barbaz") + ) + + merger := &mockMergeMultiOperator{ + fullMerge: func(key, existingValue []byte, operands [][]byte) ([]byte, bool) { + ensure.DeepEqual(&fatalAsError{t}, key, givenKey) + ensure.DeepEqual(&fatalAsError{t}, existingValue, startingVal) + ensure.DeepEqual(&fatalAsError{t}, operands[0], pMergeResult) + return fMergeResult, true + }, + partialMergeMulti: func(key []byte, operands [][]byte) ([]byte, bool) { + ensure.DeepEqual(&fatalAsError{t}, key, givenKey) + ensure.DeepEqual(&fatalAsError{t}, operands[0], mergeVal1) + ensure.DeepEqual(&fatalAsError{t}, operands[1], mergeVal2) + return pMergeResult, true + }, + } + db := newTestDB(t, "TestMergeOperator", func(opts *Options) { + opts.SetMergeOperator(merger) + }) + defer db.Close() + + wo := NewDefaultWriteOptions() + defer wo.Destroy() + + // insert a starting value and compact to trigger merges + ensure.Nil(t, db.Put(wo, givenKey, startingVal)) + + // trigger a compaction to ensure that a merge is performed + db.CompactRange(Range{nil, nil}) + + // we expect these two operands to be passed to merge multi + ensure.Nil(t, db.Merge(wo, givenKey, mergeVal1)) + ensure.Nil(t, db.Merge(wo, givenKey, mergeVal2)) + + // trigger a compaction to ensure that a + // partial and full merge are performed + db.CompactRange(Range{nil, nil}) + + ro := NewDefaultReadOptions() + v1, err := db.Get(ro, givenKey) + defer v1.Free() + ensure.Nil(t, err) + ensure.DeepEqual(t, v1.Data(), fMergeResult) + +} + +// Mock Objects type mockMergeOperator struct { - fullMerge func(key, existingValue []byte, operands [][]byte) ([]byte, bool) - partialMerge func(key, leftOperand, rightOperand []byte) ([]byte, bool) + fullMerge func(key, existingValue []byte, operands [][]byte) ([]byte, bool) } func (m *mockMergeOperator) Name() string { return "gorocksdb.test" } func (m *mockMergeOperator) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) { return m.fullMerge(key, existingValue, operands) } -func (m *mockMergeOperator) PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool) { + +type mockMergeMultiOperator struct { + fullMerge func(key, existingValue []byte, operands [][]byte) ([]byte, bool) + partialMergeMulti func(key []byte, operands [][]byte) ([]byte, bool) +} + +func (m *mockMergeMultiOperator) Name() string { return "gorocksdb.multi" } +func (m *mockMergeMultiOperator) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) { + return m.fullMerge(key, existingValue, operands) +} +func (m *mockMergeMultiOperator) PartialMergeMulti(key []byte, operands [][]byte) ([]byte, bool) { + return m.partialMergeMulti(key, operands) +} + +type mockMergePartialOperator struct { + fullMerge func(key, existingValue []byte, operands [][]byte) ([]byte, bool) + partialMerge func(key, leftOperand, rightOperand []byte) ([]byte, bool) +} + +func (m *mockMergePartialOperator) Name() string { return "gorocksdb.partial" } +func (m *mockMergePartialOperator) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) { + return m.fullMerge(key, existingValue, operands) +} +func (m *mockMergePartialOperator) PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool) { return m.partialMerge(key, leftOperand, rightOperand) } diff --git a/options.go b/options.go index 187c2f5c..07000215 100644 --- a/options.go +++ b/options.go @@ -3,7 +3,10 @@ package gorocksdb // #include "rocksdb/c.h" // #include "gorocksdb.h" import "C" -import "unsafe" +import ( + "errors" + "unsafe" +) // CompressionType specifies the block compression. // DB contents are stored in a set of blocks, each of which holds a @@ -57,6 +60,15 @@ const ( FatalInfoLogLevel = InfoLogLevel(4) ) +type WALRecoveryMode int + +const ( + TolerateCorruptedTailRecordsRecovery = WALRecoveryMode(0) + AbsoluteConsistencyRecovery = WALRecoveryMode(1) + PointInTimeRecovery = WALRecoveryMode(2) + SkipAnyCorruptedRecordsRecovery = WALRecoveryMode(3) +) + // Options represent all of the available options when opening a database with Open. type Options struct { c *C.rocksdb_options_t @@ -82,6 +94,30 @@ func NewNativeOptions(c *C.rocksdb_options_t) *Options { return &Options{c: c} } +// GetOptionsFromString creates a Options object from existing opt and string. +// If base is nil, a default opt create by NewDefaultOptions will be used as base opt. +func GetOptionsFromString(base *Options, optStr string) (*Options, error) { + if base == nil { + base = NewDefaultOptions() + defer base.Destroy() + } + + var ( + cErr *C.char + cOptStr = C.CString(optStr) + ) + defer C.free(unsafe.Pointer(cOptStr)) + + newOpt := NewDefaultOptions() + C.rocksdb_get_options_from_string(base.c, cOptStr, newOpt.c, &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + + return newOpt, nil +} + // ------------------- // Parameters that affect behavior @@ -306,7 +342,7 @@ func (opts *Options) OptimizeUniversalStyleCompaction(memtable_memory_budget uin // so you may wish to adjust this parameter to control memory usage. // Also, a larger write buffer will result in a longer recovery time // the next time the database is opened. -// Default: 4MB +// Default: 64MB func (opts *Options) SetWriteBufferSize(value int) { C.rocksdb_options_set_write_buffer_size(opts.c, C.size_t(value)) } @@ -774,6 +810,14 @@ func (opts *Options) SetDisableAutoCompactions(value bool) { C.rocksdb_options_set_disable_auto_compactions(opts.c, C.int(btoi(value))) } +// SetWALRecoveryMode sets the recovery mode +// +// Recovery mode to control the consistency while replaying WAL +// Default: TolerateCorruptedTailRecordsRecovery +func (opts *Options) SetWALRecoveryMode(mode WALRecoveryMode) { + C.rocksdb_options_set_wal_recovery_mode(opts.c, C.int(mode)) +} + // SetWALTtlSeconds sets the WAL ttl in seconds. // // The following two options affect how archived logs will be deleted. @@ -802,6 +846,13 @@ func (opts *Options) SetWalSizeLimitMb(value uint64) { C.rocksdb_options_set_WAL_size_limit_MB(opts.c, C.uint64_t(value)) } +// SetEnablePipelinedWrite enables pipelined write +// +// Default: false +func (opts *Options) SetEnablePipelinedWrite(value bool) { + C.rocksdb_options_set_enable_pipelined_write(opts.c, boolToChar(value)) +} + // SetManifestPreallocationSize sets the number of bytes // to preallocate (via fallocate) the manifest files. // @@ -937,6 +988,13 @@ func (opts *Options) SetFIFOCompactionOptions(value *FIFOCompactionOptions) { C.rocksdb_options_set_fifo_compaction_options(opts.c, value.c) } +// GetStatisticsString returns the statistics as a string. +func (opts *Options) GetStatisticsString() string { + sString := C.rocksdb_options_statistics_get_string(opts.c) + defer C.rocksdb_free(unsafe.Pointer(sString)) + return C.GoString(sString) +} + // SetRateLimiter sets the rate limiter of the options. // Use to control write rate of flush and compaction. Flush has higher // priority than compaction. Rate limiting is disabled if nullptr. @@ -1108,15 +1166,44 @@ func (opts *Options) SetAllowIngestBehind(value bool) { C.rocksdb_options_set_allow_ingest_behind(opts.c, boolToChar(value)) } +// SetMemTablePrefixBloomSizeRatio sets memtable_prefix_bloom_size_ratio +// if prefix_extractor is set and memtable_prefix_bloom_size_ratio is not 0, +// create prefix bloom for memtable with the size of +// write_buffer_size * memtable_prefix_bloom_size_ratio. +// If it is larger than 0.25, it is sanitized to 0.25. +// +// Default: 0 (disable) +func (opts *Options) SetMemTablePrefixBloomSizeRatio(value float64) { + C.rocksdb_options_set_memtable_prefix_bloom_size_ratio(opts.c, C.double(value)) +} + +// SetOptimizeFiltersForHits sets optimize_filters_for_hits +// This flag specifies that the implementation should optimize the filters +// mainly for cases where keys are found rather than also optimize for keys +// missed. This would be used in cases where the application knows that +// there are very few misses or the performance in the case of misses is not +// important. +// +// For now, this flag allows us to not store filters for the last level i.e +// the largest level which contains data of the LSM store. For keys which +// are hits, the filters in this level are not useful because we will search +// for the data anyway. NOTE: the filters in other levels are still useful +// even for key hit because they tell us whether to look in that level or go +// to the higher level. +// +// Default: false +func (opts *Options) SetOptimizeFiltersForHits(value bool) { + C.rocksdb_options_set_optimize_filters_for_hits(opts.c, C.int(btoi(value))) +} + // Destroy deallocates the Options object. func (opts *Options) Destroy() { C.rocksdb_options_destroy(opts.c) if opts.ccmp != nil { C.rocksdb_comparator_destroy(opts.ccmp) } - if opts.cst != nil { - C.rocksdb_slicetransform_destroy(opts.cst) - } + // don't destroy the opts.cst here, it has already been + // associated with a PrefixExtractor and this will segfault if opts.ccf != nil { C.rocksdb_compactionfilter_destroy(opts.ccf) } diff --git a/options_block_based_table.go b/options_block_based_table.go index e91bed01..80244132 100644 --- a/options_block_based_table.go +++ b/options_block_based_table.go @@ -56,6 +56,14 @@ func (opts *BlockBasedTableOptions) SetCacheIndexAndFilterBlocks(value bool) { C.rocksdb_block_based_options_set_cache_index_and_filter_blocks(opts.c, boolToChar(value)) } +// SetCacheIndexAndFilterBlocksWithHighPriority sets cache index and filter +// blocks with high priority (if cache_index_and_filter_blocks is enabled). +// If set to true, depending on implementation of block cache, +// index and filter blocks may be less likely to be evicted than data blocks. +func (opts *BlockBasedTableOptions) SetCacheIndexAndFilterBlocksWithHighPriority(value bool) { + C.rocksdb_block_based_options_set_cache_index_and_filter_blocks_with_high_priority(opts.c, boolToChar(value)) +} + // SetPinL0FilterAndIndexBlocksInCache sets cache_index_and_filter_blocks. // If is true and the below is true (hash_index_allow_collision), then // filter and index blocks are stored in the cache, but a reference is @@ -65,6 +73,15 @@ func (opts *BlockBasedTableOptions) SetPinL0FilterAndIndexBlocksInCache(value bo C.rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(opts.c, boolToChar(value)) } +// SetPinTopLevelIndexAndFilter set that if cache_index_and_filter_blocks is true, then +// the top-level index of partitioned filter and index blocks are stored in +// the cache, but a reference is held in the "table reader" object so the +// blocks are pinned and only evicted from cache when the table reader is +// freed. This is not limited to l0 in LSM tree. +func (opts *BlockBasedTableOptions) SetPinTopLevelIndexAndFilter(value bool) { + C.rocksdb_block_based_options_set_pin_top_level_index_and_filter(opts.c, boolToChar(value)) +} + // SetBlockSize sets the approximate size of user data packed per block. // Note that the block size specified here corresponds opts uncompressed data. // The actual size of the unit read from disk may be smaller if @@ -94,6 +111,39 @@ func (opts *BlockBasedTableOptions) SetBlockRestartInterval(blockRestartInterval C.rocksdb_block_based_options_set_block_restart_interval(opts.c, C.int(blockRestartInterval)) } +// SetIndexBlockRestartInterval is the same as SetBlockRestartInterval but used for the index block. +// Default: 1 +func (opts *BlockBasedTableOptions) SetIndexBlockRestartInterval(indexBlockRestartInterval int) { + C.rocksdb_block_based_options_set_index_block_restart_interval(opts.c, C.int(indexBlockRestartInterval)) +} + +// SetMetadataBlockSize sets the block size for partitioned metadata. +// Currently applied to indexes when +// kTwoLevelIndexSearch is used and to filters when partition_filters is used. +// Note: Since in the current implementation the filters and index partitions +// are aligned, an index/filter block is created when either index or filter +// block size reaches the specified limit. +// Note: this limit is currently applied to only index blocks; a filter +// partition is cut right after an index block is cut +// Default: 4096 +func (opts *BlockBasedTableOptions) SetMetadataBlockSize(metadataBlockSize uint64) { + C.rocksdb_block_based_options_set_metadata_block_size(opts.c, C.uint64_t(metadataBlockSize)) +} + +// SetPartitionFilters sets using partitioned full filters for each SST file. +// This option is incompatible with block-based filters. +// Note: currently this option requires kTwoLevelIndexSearch to be set as well. +// Default: false +func (opts *BlockBasedTableOptions) SetPartitionFilters(value bool) { + C.rocksdb_block_based_options_set_partition_filters(opts.c, boolToChar(value)) +} + +// SetUseDeltaEncoding sets using delta encoding to compress keys in blocks. +// ReadOptions::pin_data requires this option to be disabled. +func (opts *BlockBasedTableOptions) SetUseDeltaEncoding(value bool) { + C.rocksdb_block_based_options_set_use_delta_encoding(opts.c, boolToChar(value)) +} + // SetFilterPolicy sets the filter policy opts reduce disk reads. // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. @@ -141,6 +191,35 @@ func (opts *BlockBasedTableOptions) SetWholeKeyFiltering(value bool) { C.rocksdb_block_based_options_set_whole_key_filtering(opts.c, boolToChar(value)) } +// SetFormatVersion sets the format version. +// We currently have five versions: +// 0 -- This version is currently written out by all RocksDB's versions by +// default. Can be read by really old RocksDB's. Doesn't support changing +// checksum (default is CRC32). +// 1 -- Can be read by RocksDB's versions since 3.0. Supports non-default +// checksum, like xxHash. It is written by RocksDB when +// BlockBasedTableOptions::checksum is something other than kCRC32c. (version +// 0 is silently upconverted) +// 2 -- Can be read by RocksDB's versions since 3.10. Changes the way we +// encode compressed blocks with LZ4, BZip2 and Zlib compression. If you +// don't plan to run RocksDB before version 3.10, you should probably use +// this. +// 3 -- Can be read by RocksDB's versions since 5.15. Changes the way we +// encode the keys in index blocks. If you don't plan to run RocksDB before +// version 5.15, you should probably use this. +// This option only affects newly written tables. When reading existing +// tables, the information about version is read from the footer. +// 4 -- Can be read by RocksDB's versions since 5.16. Changes the way we +// encode the values in index blocks. If you don't plan to run RocksDB before +// version 5.16 and you are using index_block_restart_interval > 1, you should +// probably use this as it would reduce the index size. +// This option only affects newly written tables. When reading existing +// tables, the information about version is read from the footer. +// Default: 2 +func (opts *BlockBasedTableOptions) SetFormatVersion(version int) { + C.rocksdb_block_based_options_set_format_version(opts.c, C.int(version)) +} + // SetIndexType sets the index type used for this table. // kBinarySearch: // A space efficient index block that is optimized for diff --git a/options_read.go b/options_read.go index 997fa163..6a37cc48 100644 --- a/options_read.go +++ b/options_read.go @@ -48,6 +48,17 @@ func (opts *ReadOptions) SetVerifyChecksums(value bool) { C.rocksdb_readoptions_set_verify_checksums(opts.c, boolToChar(value)) } +// SetPrefixSameAsStart Enforce that the iterator only iterates over the same +// prefix as the seek. +// This option is effective only for prefix seeks, i.e. prefix_extractor is +// non-null for the column family and total_order_seek is false. Unlike +// iterate_upper_bound, prefix_same_as_start only works within a prefix +// but in both directions. +// Default: false +func (opts *ReadOptions) SetPrefixSameAsStart(value bool) { + C.rocksdb_readoptions_set_prefix_same_as_start(opts.c, boolToChar(value)) +} + // SetFillCache specify whether the "data block"/"index block"/"filter block" // read for this iteration should be cached in memory? // Callers may wish to set this field to false for bulk scans. diff --git a/slice.go b/slice.go index 22b48e09..707a1f2e 100644 --- a/slice.go +++ b/slice.go @@ -1,6 +1,7 @@ package gorocksdb // #include +// #include "rocksdb/c.h" import "C" import "unsafe" @@ -31,7 +32,8 @@ func StringToSlice(data string) *Slice { return NewSlice(C.CString(data), C.size_t(len(data))) } -// Data returns the data of the slice. +// Data returns the data of the slice. If the key doesn't exist this will be a +// nil slice. func (s *Slice) Data() []byte { return charToByte(s.data, s.size) } @@ -41,10 +43,42 @@ func (s *Slice) Size() int { return int(s.size) } +// Exists returns if the key exists +func (s *Slice) Exists() bool { + return s.data != nil +} + // Free frees the slice data. func (s *Slice) Free() { if !s.freed { - C.free(unsafe.Pointer(s.data)) + C.rocksdb_free(unsafe.Pointer(s.data)) s.freed = true } } + +// PinnableSliceHandle represents a handle to a PinnableSlice. +type PinnableSliceHandle struct { + c *C.rocksdb_pinnableslice_t +} + +// NewNativePinnableSliceHandle creates a PinnableSliceHandle object. +func NewNativePinnableSliceHandle(c *C.rocksdb_pinnableslice_t) *PinnableSliceHandle { + return &PinnableSliceHandle{c} +} + +// Data returns the data of the slice. +func (h *PinnableSliceHandle) Data() []byte { + if h.c == nil { + return nil + } + + var cValLen C.size_t + cValue := C.rocksdb_pinnableslice_value(h.c, &cValLen) + + return charToByte(cValue, cValLen) +} + +// Destroy calls the destructor of the underlying pinnable slice handle. +func (h *PinnableSliceHandle) Destroy() { + C.rocksdb_pinnableslice_destroy(h.c) +} diff --git a/slice_transform.go b/slice_transform.go index e66e4d84..8b9b2362 100644 --- a/slice_transform.go +++ b/slice_transform.go @@ -23,6 +23,11 @@ func NewFixedPrefixTransform(prefixLen int) SliceTransform { return NewNativeSliceTransform(C.rocksdb_slicetransform_create_fixed_prefix(C.size_t(prefixLen))) } +// NewNoopPrefixTransform creates a new no-op prefix transform. +func NewNoopPrefixTransform() SliceTransform { + return NewNativeSliceTransform(C.rocksdb_slicetransform_create_noop()) +} + // NewNativeSliceTransform creates a SliceTransform object. func NewNativeSliceTransform(c *C.rocksdb_slicetransform_t) SliceTransform { return nativeSliceTransform{c} diff --git a/slice_transform_test.go b/slice_transform_test.go index 1c551183..d60c7326 100644 --- a/slice_transform_test.go +++ b/slice_transform_test.go @@ -35,6 +35,13 @@ func TestFixedPrefixTransformOpen(t *testing.T) { defer db.Close() } +func TestNewNoopPrefixTransform(t *testing.T) { + db := newTestDB(t, "TestNewNoopPrefixTransform", func(opts *Options) { + opts.SetPrefixExtractor(NewNoopPrefixTransform()) + }) + defer db.Close() +} + type testSliceTransform struct { initiated bool } diff --git a/sst_file_writer.go b/sst_file_writer.go index 0f4689c2..54f2c139 100644 --- a/sst_file_writer.go +++ b/sst_file_writer.go @@ -30,7 +30,7 @@ func (w *SSTFileWriter) Open(path string) error { defer C.free(unsafe.Pointer(cPath)) C.rocksdb_sstfilewriter_open(w.c, cPath, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -44,7 +44,7 @@ func (w *SSTFileWriter) Add(key, value []byte) error { var cErr *C.char C.rocksdb_sstfilewriter_add(w.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -55,7 +55,7 @@ func (w *SSTFileWriter) Finish() error { var cErr *C.char C.rocksdb_sstfilewriter_finish(w.c, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/staticflag_linux.go b/staticflag_linux.go index 6653c498..3af044ef 100644 --- a/staticflag_linux.go +++ b/staticflag_linux.go @@ -2,5 +2,5 @@ package gorocksdb -// #cgo LDFLAGS: -l:librocksdb.a -l:libstdc++.a -l:libz.a -l:libbz2.a -l:libsnappy.a -lm +// #cgo LDFLAGS: -l:librocksdb.a -l:libstdc++.a -lm -ldl import "C" diff --git a/transaction.go b/transaction.go index 49a04bd7..67c9ef09 100644 --- a/transaction.go +++ b/transaction.go @@ -26,7 +26,7 @@ func (transaction *Transaction) Commit() error { ) C.rocksdb_transaction_commit(transaction.c, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -40,7 +40,7 @@ func (transaction *Transaction) Rollback() error { C.rocksdb_transaction_rollback(transaction.c, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -57,7 +57,7 @@ func (transaction *Transaction) Get(opts *ReadOptions, key []byte) (*Slice, erro transaction.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr, ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewSlice(cValue, cValLen), nil @@ -74,7 +74,7 @@ func (transaction *Transaction) GetForUpdate(opts *ReadOptions, key []byte) (*Sl transaction.c, opts.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr, ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewSlice(cValue, cValLen), nil @@ -91,7 +91,7 @@ func (transaction *Transaction) Put(key, value []byte) error { transaction.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -105,7 +105,7 @@ func (transaction *Transaction) Delete(key []byte) error { ) C.rocksdb_transaction_delete(transaction.c, cKey, C.size_t(len(key)), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/transactiondb.go b/transactiondb.go index f5d2fd70..cfdeac9c 100644 --- a/transactiondb.go +++ b/transactiondb.go @@ -30,7 +30,7 @@ func OpenTransactionDb( db := C.rocksdb_transactiondb_open( opts.c, transactionDBOpts.c, cName, &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &TransactionDB{ @@ -83,7 +83,7 @@ func (db *TransactionDB) Get(opts *ReadOptions, key []byte) (*Slice, error) { db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr, ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewSlice(cValue, cValLen), nil @@ -100,7 +100,7 @@ func (db *TransactionDB) Put(opts *WriteOptions, key, value []byte) error { db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -114,7 +114,7 @@ func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error { ) C.rocksdb_transactiondb_delete(db.c, opts.c, cKey, C.size_t(len(key)), &cErr) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -129,7 +129,7 @@ func (db *TransactionDB) NewCheckpoint() (*Checkpoint, error) { db.c, &cErr, ) if cErr != nil { - defer C.free(unsafe.Pointer(cErr)) + defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } diff --git a/util.go b/util.go index ae099cd3..b6637ec3 100644 --- a/util.go +++ b/util.go @@ -1,4 +1,5 @@ package gorocksdb + // #include import "C" diff --git a/wal_iterator.go b/wal_iterator.go new file mode 100755 index 00000000..7805d7c9 --- /dev/null +++ b/wal_iterator.go @@ -0,0 +1,49 @@ +package gorocksdb + +// #include +// #include "rocksdb/c.h" +import "C" +import ( + "errors" + "unsafe" +) + +type WalIterator struct { + c *C.rocksdb_wal_iterator_t +} + +func NewNativeWalIterator(c unsafe.Pointer) *WalIterator { + return &WalIterator{(*C.rocksdb_wal_iterator_t)(c)} +} + +func (iter *WalIterator) Valid() bool { + return C.rocksdb_wal_iter_valid(iter.c) != 0 +} + +func (iter *WalIterator) Next() { + C.rocksdb_wal_iter_next(iter.c) +} + +func (iter *WalIterator) Err() error { + var cErr *C.char + C.rocksdb_wal_iter_status(iter.c, &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + +func (iter *WalIterator) Destroy() { + C.rocksdb_wal_iter_destroy(iter.c) + iter.c = nil +} + +// C.rocksdb_wal_iter_get_batch in the official rocksdb c wrapper has memory leak +// see https://github.com/facebook/rocksdb/pull/5515 +// https://github.com/facebook/rocksdb/issues/5536 +func (iter *WalIterator) GetBatch() (*WriteBatch, uint64) { + var cSeq C.uint64_t + cB := C.rocksdb_wal_iter_get_batch(iter.c, &cSeq) + return NewNativeWriteBatch(cB), uint64(cSeq) +} diff --git a/write_batch.go b/write_batch.go index 88b5aac8..f894427b 100644 --- a/write_batch.go +++ b/write_batch.go @@ -41,6 +41,12 @@ func (wb *WriteBatch) PutCF(cf *ColumnFamilyHandle, key, value []byte) { C.rocksdb_writebatch_put_cf(wb.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) } +// Append a blob of arbitrary size to the records in this batch. +func (wb *WriteBatch) PutLogData(blob []byte) { + cBlob := byteToChar(blob) + C.rocksdb_writebatch_put_log_data(wb.c, cBlob, C.size_t(len(blob))) +} + // Merge queues a merge of "value" with the existing value of "key". func (wb *WriteBatch) Merge(key, value []byte) { cKey := byteToChar(key) @@ -68,6 +74,21 @@ func (wb *WriteBatch) DeleteCF(cf *ColumnFamilyHandle, key []byte) { C.rocksdb_writebatch_delete_cf(wb.c, cf.c, cKey, C.size_t(len(key))) } +// DeleteRange deletes keys that are between [startKey, endKey) +func (wb *WriteBatch) DeleteRange(startKey []byte, endKey []byte) { + cStartKey := byteToChar(startKey) + cEndKey := byteToChar(endKey) + C.rocksdb_writebatch_delete_range(wb.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey))) +} + +// DeleteRangeCF deletes keys that are between [startKey, endKey) and +// belong to a given column family +func (wb *WriteBatch) DeleteRangeCF(cf *ColumnFamilyHandle, startKey []byte, endKey []byte) { + cStartKey := byteToChar(startKey) + cEndKey := byteToChar(endKey) + C.rocksdb_writebatch_delete_range_cf(wb.c, cf.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey))) +} + // Data returns the serialized version of this batch. func (wb *WriteBatch) Data() []byte { var cSize C.size_t diff --git a/write_batch_test.go b/write_batch_test.go index d6000e57..72eeb36e 100644 --- a/write_batch_test.go +++ b/write_batch_test.go @@ -39,6 +39,18 @@ func TestWriteBatch(t *testing.T) { defer v2.Free() ensure.Nil(t, err) ensure.True(t, v2.Data() == nil) + + // DeleteRange test + wb.Clear() + wb.DeleteRange(givenKey1, givenKey2) + + // perform the batch + ensure.Nil(t, db.Write(wo, wb)) + + v1, err = db.Get(ro, givenKey1) + defer v1.Free() + ensure.Nil(t, err) + ensure.True(t, v1.Data() == nil) } func TestWriteBatchIterator(t *testing.T) {