diff --git a/db.go b/db.go index 750296f..a4de0e3 100644 --- a/db.go +++ b/db.go @@ -6,6 +6,7 @@ import "C" import ( "errors" "fmt" + "runtime" "unsafe" ) @@ -255,6 +256,7 @@ func (db *DB) Get(opts *ReadOptions, key []byte) (*Slice, error) { cKey = byteToChar(key) ) cValue := C.rocksdb_get(db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -270,6 +272,7 @@ func (db *DB) GetBytes(opts *ReadOptions, key []byte) ([]byte, error) { cKey = byteToChar(key) ) cValue := C.rocksdb_get(db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -289,6 +292,7 @@ func (db *DB) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Sli cKey = byteToChar(key) ) cValue := C.rocksdb_get_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -303,6 +307,7 @@ func (db *DB) GetPinned(opts *ReadOptions, key []byte) (*PinnableSliceHandle, er cKey = byteToChar(key) ) cHandle := C.rocksdb_get_pinned(db.c, opts.c, cKey, C.size_t(len(key)), &cErr) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -411,6 +416,8 @@ func (db *DB) Put(opts *WriteOptions, key, value []byte) error { cValue = byteToChar(value) ) C.rocksdb_put(db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) + runtime.KeepAlive(key) + runtime.KeepAlive(value) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -426,6 +433,8 @@ func (db *DB) PutCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, value []byt cValue = byteToChar(value) ) C.rocksdb_put_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) + runtime.KeepAlive(key) + runtime.KeepAlive(value) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -440,6 +449,7 @@ func (db *DB) Delete(opts *WriteOptions, key []byte) error { cKey = byteToChar(key) ) C.rocksdb_delete(db.c, opts.c, cKey, C.size_t(len(key)), &cErr) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -454,6 +464,7 @@ func (db *DB) DeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte) e cKey = byteToChar(key) ) C.rocksdb_delete_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cErr) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -469,6 +480,8 @@ func (db *DB) Merge(opts *WriteOptions, key []byte, value []byte) error { cValue = byteToChar(value) ) C.rocksdb_merge(db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) + runtime.KeepAlive(key) + runtime.KeepAlive(value) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -485,6 +498,8 @@ func (db *DB) MergeCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte, va cValue = byteToChar(value) ) C.rocksdb_merge_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) + runtime.KeepAlive(key) + runtime.KeepAlive(value) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -658,6 +673,7 @@ func (db *DB) CompactRange(r Range) { cStart := byteToChar(r.Start) cLimit := byteToChar(r.Limit) C.rocksdb_compact_range(db.c, cStart, C.size_t(len(r.Start)), cLimit, C.size_t(len(r.Limit))) + runtime.KeepAlive(r) } // CompactRangeCF runs a manual compaction on the Range of keys given on the @@ -666,6 +682,7 @@ func (db *DB) CompactRangeCF(cf *ColumnFamilyHandle, r Range) { cStart := byteToChar(r.Start) cLimit := byteToChar(r.Limit) C.rocksdb_compact_range_cf(db.c, cf.c, cStart, C.size_t(len(r.Start)), cLimit, C.size_t(len(r.Limit))) + runtime.KeepAlive(r) } // Flush triggers a manuel flush for the database. @@ -723,6 +740,7 @@ func (db *DB) DeleteFileInRange(r Range) error { cLimitKey, C.size_t(len(r.Limit)), &cErr, ) + runtime.KeepAlive(r) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) @@ -746,6 +764,7 @@ func (db *DB) DeleteFileInRangeCF(cf *ColumnFamilyHandle, r Range) error { cLimitKey, C.size_t(len(r.Limit)), &cErr, ) + runtime.KeepAlive(r) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) diff --git a/iterator.go b/iterator.go index fefb82f..22014e3 100644 --- a/iterator.go +++ b/iterator.go @@ -6,6 +6,7 @@ import "C" import ( "bytes" "errors" + "runtime" "unsafe" ) @@ -98,6 +99,7 @@ func (iter *Iterator) SeekToLast() { func (iter *Iterator) Seek(key []byte) { cKey := byteToChar(key) C.rocksdb_iter_seek(iter.c, cKey, C.size_t(len(key))) + runtime.KeepAlive(key) } // SeekForPrev moves the iterator to the last key that less than or equal @@ -105,6 +107,7 @@ func (iter *Iterator) Seek(key []byte) { func (iter *Iterator) SeekForPrev(key []byte) { cKey := byteToChar(key) C.rocksdb_iter_seek_for_prev(iter.c, cKey, C.size_t(len(key))) + runtime.KeepAlive(key) } // Err returns nil if no errors happened during iteration, or the actual diff --git a/options_block_based_table_v6.go b/options_block_based_table_v6.go index c2bb1fd..27cc6c7 100644 --- a/options_block_based_table_v6.go +++ b/options_block_based_table_v6.go @@ -2,6 +2,7 @@ package gorocksdb +// #include "rocksdb/c.h" import "C" // DataBlockIndexType specifies the index type that will be used for the data block. diff --git a/options_read.go b/options_read.go index da734d3..fa13de5 100644 --- a/options_read.go +++ b/options_read.go @@ -1,5 +1,6 @@ package gorocksdb +// #include // #include "rocksdb/c.h" import "C" import ( @@ -25,7 +26,9 @@ const ( // ReadOptions represent all of the available options when reading from a // database. type ReadOptions struct { - c *C.rocksdb_readoptions_t + c *C.rocksdb_readoptions_t + cIterateLowerBound *C.char + cIterateUpperBound *C.char } // NewDefaultReadOptions creates a default ReadOptions object. @@ -35,7 +38,7 @@ func NewDefaultReadOptions() *ReadOptions { // NewNativeReadOptions creates a ReadOptions object. func NewNativeReadOptions(c *C.rocksdb_readoptions_t) *ReadOptions { - return &ReadOptions{c} + return &ReadOptions{c: c} } // UnsafeGetReadOptions returns the underlying c read options object. @@ -106,9 +109,35 @@ func (opts *ReadOptions) SetTailing(value bool) { // implemented. // Default: nullptr func (opts *ReadOptions) SetIterateUpperBound(key []byte) { - cKey := byteToChar(key) + C.free(unsafe.Pointer(opts.cIterateUpperBound)) + if key == nil { + opts.cIterateUpperBound = nil + } else { + opts.cIterateUpperBound = cByteSlice(key) + } cKeyLen := C.size_t(len(key)) - C.rocksdb_readoptions_set_iterate_upper_bound(opts.c, cKey, cKeyLen) + C.rocksdb_readoptions_set_iterate_upper_bound(opts.c, opts.cIterateUpperBound, cKeyLen) +} + +// SetIterateLowerBound specifies "iterate_lower_bound", which defines +// the smallest key at which the backward iterator can return an entry. +// Once the bound is passed, Valid() will be false. +// `iterate_lower_bound` is inclusive ie the bound value is a valid entry. +// +// If prefix_extractor is not null, the Seek target and `iterate_lower_bound` +// need to have the same prefix. This is because ordering is not guaranteed +// outside of prefix domain. +// +// Default: nullptr +func (opts *ReadOptions) SetIterateLowerBound(key []byte) { + C.free(unsafe.Pointer(opts.cIterateLowerBound)) + if key == nil { + opts.cIterateLowerBound = nil + } else { + opts.cIterateLowerBound = cByteSlice(key) + } + cKeyLen := C.size_t(len(key)) + C.rocksdb_readoptions_set_iterate_lower_bound(opts.c, opts.cIterateLowerBound, cKeyLen) } // SetPinData specifies the value of "pin_data". If true, it keeps the blocks @@ -173,5 +202,9 @@ func (opts *ReadOptions) SetIgnoreRangeDeletions(value bool) { // Destroy deallocates the ReadOptions object. func (opts *ReadOptions) Destroy() { C.rocksdb_readoptions_destroy(opts.c) + C.free(unsafe.Pointer(opts.cIterateLowerBound)) + C.free(unsafe.Pointer(opts.cIterateUpperBound)) opts.c = nil + opts.cIterateLowerBound = nil + opts.cIterateUpperBound = nil } diff --git a/options_v6.go b/options_v6.go index 5511171..da3eefa 100644 --- a/options_v6.go +++ b/options_v6.go @@ -2,6 +2,9 @@ package gorocksdb +// #include "rocksdb/c.h" +import "C" + // SetAtomicFlush sets atomic_flush // If true, RocksDB supports flushing multiple column families and committing // their results atomically to MANIFEST. Note that it is not diff --git a/options_write_v6.go b/options_write_v6.go index 93e986c..a52dc68 100644 --- a/options_write_v6.go +++ b/options_write_v6.go @@ -2,6 +2,7 @@ package gorocksdb +// #include "rocksdb/c.h" import "C" // SetMemtableInsertHintPerBatch specifies the value of "memtable_insert_hint_per_batch". diff --git a/sst_file_writer.go b/sst_file_writer.go index 54f2c13..753a4c0 100644 --- a/sst_file_writer.go +++ b/sst_file_writer.go @@ -6,6 +6,7 @@ import "C" import ( "errors" + "runtime" "unsafe" ) @@ -43,6 +44,8 @@ func (w *SSTFileWriter) Add(key, value []byte) error { cValue := byteToChar(value) var cErr *C.char C.rocksdb_sstfilewriter_add(w.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) + runtime.KeepAlive(key) + runtime.KeepAlive(value) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) diff --git a/transaction.go b/transaction.go index 5a2889c..09722ed 100644 --- a/transaction.go +++ b/transaction.go @@ -7,6 +7,7 @@ import "C" import ( "errors" "unsafe" + "runtime" ) // Transaction is used with TransactionDB for transaction support. @@ -56,6 +57,7 @@ func (transaction *Transaction) Get(opts *ReadOptions, key []byte) (*Slice, erro cValue := C.rocksdb_transaction_get( transaction.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr, ) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -73,6 +75,7 @@ func (transaction *Transaction) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, cValue := C.rocksdb_transaction_get_cf( transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr, ) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -90,6 +93,7 @@ func (transaction *Transaction) GetForUpdate(opts *ReadOptions, key []byte) (*Sl cValue := C.rocksdb_transaction_get_for_update( transaction.c, opts.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr, ) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -107,6 +111,8 @@ func (transaction *Transaction) Put(key, value []byte) error { C.rocksdb_transaction_put( transaction.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, ) + runtime.KeepAlive(key) + runtime.KeepAlive(value) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -124,6 +130,8 @@ func (transaction *Transaction) PutCF(cf *ColumnFamilyHandle, key, value []byte) C.rocksdb_transaction_put_cf( transaction.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, ) + runtime.KeepAlive(key) + runtime.KeepAlive(value) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -138,6 +146,7 @@ func (transaction *Transaction) Delete(key []byte) error { cKey = byteToChar(key) ) C.rocksdb_transaction_delete(transaction.c, cKey, C.size_t(len(key)), &cErr) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -152,6 +161,7 @@ func (transaction *Transaction) DeleteCF(cf *ColumnFamilyHandle, key []byte) err cKey = byteToChar(key) ) C.rocksdb_transaction_delete_cf(transaction.c, cf.c, cKey, C.size_t(len(key)), &cErr) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) diff --git a/transaction_v6.go b/transaction_v6.go index 4163043..106e9ef 100644 --- a/transaction_v6.go +++ b/transaction_v6.go @@ -2,8 +2,12 @@ package gorocksdb +// #include "rocksdb/c.h" +import "C" + import ( "errors" + "unsafe" ) import "C" @@ -18,6 +22,7 @@ func (transaction *Transaction) GetForUpdateCF(opts *ReadOptions, cf *ColumnFami cValue := C.rocksdb_transaction_get_for_update_cf( transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr, ) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) diff --git a/transactiondb.go b/transactiondb.go index 4654667..8a698db 100644 --- a/transactiondb.go +++ b/transactiondb.go @@ -6,6 +6,7 @@ import "C" import ( "errors" "unsafe" + "runtime" ) // TransactionDB is a reusable handle to a RocksDB transactional database on disk, created by OpenTransactionDb. @@ -101,6 +102,7 @@ func (db *TransactionDB) Get(opts *ReadOptions, key []byte) (*Slice, error) { cValue := C.rocksdb_transactiondb_get( db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr, ) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -118,6 +120,7 @@ func (db *TransactionDB) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key [] cValue := C.rocksdb_transactiondb_get_cf( db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr, ) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) @@ -135,6 +138,8 @@ func (db *TransactionDB) Put(opts *WriteOptions, key, value []byte) error { C.rocksdb_transactiondb_put( db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, ) + runtime.KeepAlive(key) + runtime.KeepAlive(value) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -152,6 +157,8 @@ func (db *TransactionDB) PutCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, C.rocksdb_transactiondb_put_cf( db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, ) + runtime.KeepAlive(key) + runtime.KeepAlive(value) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -166,6 +173,7 @@ func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error { cKey = byteToChar(key) ) C.rocksdb_transactiondb_delete(db.c, opts.c, cKey, C.size_t(len(key)), &cErr) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) @@ -180,6 +188,7 @@ func (db *TransactionDB) DeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, ke cKey = byteToChar(key) ) C.rocksdb_transactiondb_delete_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cErr) + runtime.KeepAlive(key) if cErr != nil { defer C.rocksdb_free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) diff --git a/transactiondb_v6.go b/transactiondb_v6.go index 53b69d4..c20728c 100644 --- a/transactiondb_v6.go +++ b/transactiondb_v6.go @@ -4,8 +4,11 @@ package gorocksdb import ( "errors" + "unsafe" ) +// #include +// #include "rocksdb/c.h" import "C" // OpenTransactionDbColumnFamilies opens a database with the specified options. diff --git a/write_batch.go b/write_batch.go index f894427..df14df9 100644 --- a/write_batch.go +++ b/write_batch.go @@ -5,6 +5,7 @@ import "C" import ( "errors" "io" + "runtime" ) // WriteBatch is a batching of Puts, Merges and Deletes. @@ -32,6 +33,8 @@ func (wb *WriteBatch) Put(key, value []byte) { cKey := byteToChar(key) cValue := byteToChar(value) C.rocksdb_writebatch_put(wb.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) + runtime.KeepAlive(key) + runtime.KeepAlive(value) } // PutCF queues a key-value pair in a column family. @@ -39,12 +42,15 @@ func (wb *WriteBatch) PutCF(cf *ColumnFamilyHandle, key, value []byte) { cKey := byteToChar(key) cValue := byteToChar(value) C.rocksdb_writebatch_put_cf(wb.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) + runtime.KeepAlive(key) + runtime.KeepAlive(value) } // Append a blob of arbitrary size to the records in this batch. func (wb *WriteBatch) PutLogData(blob []byte) { cBlob := byteToChar(blob) C.rocksdb_writebatch_put_log_data(wb.c, cBlob, C.size_t(len(blob))) + runtime.KeepAlive(blob) } // Merge queues a merge of "value" with the existing value of "key". @@ -52,6 +58,8 @@ func (wb *WriteBatch) Merge(key, value []byte) { cKey := byteToChar(key) cValue := byteToChar(value) C.rocksdb_writebatch_merge(wb.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) + runtime.KeepAlive(key) + runtime.KeepAlive(value) } // MergeCF queues a merge of "value" with the existing value of "key" in a @@ -60,18 +68,22 @@ func (wb *WriteBatch) MergeCF(cf *ColumnFamilyHandle, key, value []byte) { cKey := byteToChar(key) cValue := byteToChar(value) C.rocksdb_writebatch_merge_cf(wb.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) + runtime.KeepAlive(key) + runtime.KeepAlive(value) } // Delete queues a deletion of the data at key. func (wb *WriteBatch) Delete(key []byte) { cKey := byteToChar(key) C.rocksdb_writebatch_delete(wb.c, cKey, C.size_t(len(key))) + runtime.KeepAlive(key) } // DeleteCF queues a deletion of the data at key in a column family. func (wb *WriteBatch) DeleteCF(cf *ColumnFamilyHandle, key []byte) { cKey := byteToChar(key) C.rocksdb_writebatch_delete_cf(wb.c, cf.c, cKey, C.size_t(len(key))) + runtime.KeepAlive(key) } // DeleteRange deletes keys that are between [startKey, endKey) @@ -79,6 +91,8 @@ func (wb *WriteBatch) DeleteRange(startKey []byte, endKey []byte) { cStartKey := byteToChar(startKey) cEndKey := byteToChar(endKey) C.rocksdb_writebatch_delete_range(wb.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey))) + runtime.KeepAlive(startKey) + runtime.KeepAlive(endKey) } // DeleteRangeCF deletes keys that are between [startKey, endKey) and @@ -87,6 +101,8 @@ func (wb *WriteBatch) DeleteRangeCF(cf *ColumnFamilyHandle, startKey []byte, end cStartKey := byteToChar(startKey) cEndKey := byteToChar(endKey) C.rocksdb_writebatch_delete_range_cf(wb.c, cf.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey))) + runtime.KeepAlive(startKey) + runtime.KeepAlive(endKey) } // Data returns the serialized version of this batch.