Skip to content

Commit

Permalink
ensure go buffer keepalive, tecbot#190
Browse files Browse the repository at this point in the history
  • Loading branch information
flier committed Mar 23, 2021
1 parent b327761 commit 8b506a8
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 4 deletions.
19 changes: 19 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "C"
import (
"errors"
"fmt"
"runtime"
"unsafe"
)

Expand Down Expand Up @@ -255,6 +256,7 @@ func (db *DB) Get(opts *ReadOptions, key []byte) (*Slice, error) {
cKey = byteToChar(key)
)
cValue := C.rocksdb_get(db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
Expand All @@ -270,6 +272,7 @@ func (db *DB) GetBytes(opts *ReadOptions, key []byte) ([]byte, error) {
cKey = byteToChar(key)
)
cValue := C.rocksdb_get(db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
Expand All @@ -289,6 +292,7 @@ func (db *DB) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Sli
cKey = byteToChar(key)
)
cValue := C.rocksdb_get_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
Expand All @@ -303,6 +307,7 @@ func (db *DB) GetPinned(opts *ReadOptions, key []byte) (*PinnableSliceHandle, er
cKey = byteToChar(key)
)
cHandle := C.rocksdb_get_pinned(db.c, opts.c, cKey, C.size_t(len(key)), &cErr)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
Expand Down Expand Up @@ -411,6 +416,8 @@ func (db *DB) Put(opts *WriteOptions, key, value []byte) error {
cValue = byteToChar(value)
)
C.rocksdb_put(db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr)
runtime.KeepAlive(key)
runtime.KeepAlive(value)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand All @@ -426,6 +433,8 @@ func (db *DB) PutCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, value []byt
cValue = byteToChar(value)
)
C.rocksdb_put_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr)
runtime.KeepAlive(key)
runtime.KeepAlive(value)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand All @@ -440,6 +449,7 @@ func (db *DB) Delete(opts *WriteOptions, key []byte) error {
cKey = byteToChar(key)
)
C.rocksdb_delete(db.c, opts.c, cKey, C.size_t(len(key)), &cErr)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand All @@ -454,6 +464,7 @@ func (db *DB) DeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte) e
cKey = byteToChar(key)
)
C.rocksdb_delete_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cErr)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand All @@ -469,6 +480,8 @@ func (db *DB) Merge(opts *WriteOptions, key []byte, value []byte) error {
cValue = byteToChar(value)
)
C.rocksdb_merge(db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr)
runtime.KeepAlive(key)
runtime.KeepAlive(value)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand All @@ -485,6 +498,8 @@ func (db *DB) MergeCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte, va
cValue = byteToChar(value)
)
C.rocksdb_merge_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr)
runtime.KeepAlive(key)
runtime.KeepAlive(value)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand Down Expand Up @@ -658,6 +673,7 @@ func (db *DB) CompactRange(r Range) {
cStart := byteToChar(r.Start)
cLimit := byteToChar(r.Limit)
C.rocksdb_compact_range(db.c, cStart, C.size_t(len(r.Start)), cLimit, C.size_t(len(r.Limit)))
runtime.KeepAlive(r)
}

// CompactRangeCF runs a manual compaction on the Range of keys given on the
Expand All @@ -666,6 +682,7 @@ func (db *DB) CompactRangeCF(cf *ColumnFamilyHandle, r Range) {
cStart := byteToChar(r.Start)
cLimit := byteToChar(r.Limit)
C.rocksdb_compact_range_cf(db.c, cf.c, cStart, C.size_t(len(r.Start)), cLimit, C.size_t(len(r.Limit)))
runtime.KeepAlive(r)
}

// Flush triggers a manuel flush for the database.
Expand Down Expand Up @@ -723,6 +740,7 @@ func (db *DB) DeleteFileInRange(r Range) error {
cLimitKey, C.size_t(len(r.Limit)),
&cErr,
)
runtime.KeepAlive(r)

if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
Expand All @@ -746,6 +764,7 @@ func (db *DB) DeleteFileInRangeCF(cf *ColumnFamilyHandle, r Range) error {
cLimitKey, C.size_t(len(r.Limit)),
&cErr,
)
runtime.KeepAlive(r)

if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
Expand Down
3 changes: 3 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "C"
import (
"bytes"
"errors"
"runtime"
"unsafe"
)

Expand Down Expand Up @@ -98,13 +99,15 @@ func (iter *Iterator) SeekToLast() {
func (iter *Iterator) Seek(key []byte) {
cKey := byteToChar(key)
C.rocksdb_iter_seek(iter.c, cKey, C.size_t(len(key)))
runtime.KeepAlive(key)
}

// SeekForPrev moves the iterator to the last key that less than or equal
// to the target key, in contrast with Seek.
func (iter *Iterator) SeekForPrev(key []byte) {
cKey := byteToChar(key)
C.rocksdb_iter_seek_for_prev(iter.c, cKey, C.size_t(len(key)))
runtime.KeepAlive(key)
}

// Err returns nil if no errors happened during iteration, or the actual
Expand Down
1 change: 1 addition & 0 deletions options_block_based_table_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package gorocksdb

// #include "rocksdb/c.h"
import "C"

// DataBlockIndexType specifies the index type that will be used for the data block.
Expand Down
41 changes: 37 additions & 4 deletions options_read.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gorocksdb

// #include <stdlib.h>
// #include "rocksdb/c.h"
import "C"
import (
Expand All @@ -25,7 +26,9 @@ const (
// ReadOptions represent all of the available options when reading from a
// database.
type ReadOptions struct {
c *C.rocksdb_readoptions_t
c *C.rocksdb_readoptions_t
cIterateLowerBound *C.char
cIterateUpperBound *C.char
}

// NewDefaultReadOptions creates a default ReadOptions object.
Expand All @@ -35,7 +38,7 @@ func NewDefaultReadOptions() *ReadOptions {

// NewNativeReadOptions creates a ReadOptions object.
func NewNativeReadOptions(c *C.rocksdb_readoptions_t) *ReadOptions {
return &ReadOptions{c}
return &ReadOptions{c: c}
}

// UnsafeGetReadOptions returns the underlying c read options object.
Expand Down Expand Up @@ -106,9 +109,35 @@ func (opts *ReadOptions) SetTailing(value bool) {
// implemented.
// Default: nullptr
func (opts *ReadOptions) SetIterateUpperBound(key []byte) {
cKey := byteToChar(key)
C.free(unsafe.Pointer(opts.cIterateUpperBound))
if key == nil {
opts.cIterateUpperBound = nil
} else {
opts.cIterateUpperBound = cByteSlice(key)
}
cKeyLen := C.size_t(len(key))
C.rocksdb_readoptions_set_iterate_upper_bound(opts.c, cKey, cKeyLen)
C.rocksdb_readoptions_set_iterate_upper_bound(opts.c, opts.cIterateUpperBound, cKeyLen)
}

// SetIterateLowerBound specifies "iterate_lower_bound", which defines
// the smallest key at which the backward iterator can return an entry.
// Once the bound is passed, Valid() will be false.
// `iterate_lower_bound` is inclusive ie the bound value is a valid entry.
//
// If prefix_extractor is not null, the Seek target and `iterate_lower_bound`
// need to have the same prefix. This is because ordering is not guaranteed
// outside of prefix domain.
//
// Default: nullptr
func (opts *ReadOptions) SetIterateLowerBound(key []byte) {
C.free(unsafe.Pointer(opts.cIterateLowerBound))
if key == nil {
opts.cIterateLowerBound = nil
} else {
opts.cIterateLowerBound = cByteSlice(key)
}
cKeyLen := C.size_t(len(key))
C.rocksdb_readoptions_set_iterate_lower_bound(opts.c, opts.cIterateLowerBound, cKeyLen)
}

// SetPinData specifies the value of "pin_data". If true, it keeps the blocks
Expand Down Expand Up @@ -173,5 +202,9 @@ func (opts *ReadOptions) SetIgnoreRangeDeletions(value bool) {
// Destroy deallocates the ReadOptions object.
func (opts *ReadOptions) Destroy() {
C.rocksdb_readoptions_destroy(opts.c)
C.free(unsafe.Pointer(opts.cIterateLowerBound))
C.free(unsafe.Pointer(opts.cIterateUpperBound))
opts.c = nil
opts.cIterateLowerBound = nil
opts.cIterateUpperBound = nil
}
3 changes: 3 additions & 0 deletions options_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

package gorocksdb

// #include "rocksdb/c.h"
import "C"

// SetAtomicFlush sets atomic_flush
// If true, RocksDB supports flushing multiple column families and committing
// their results atomically to MANIFEST. Note that it is not
Expand Down
1 change: 1 addition & 0 deletions options_write_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package gorocksdb

// #include "rocksdb/c.h"
import "C"

// SetMemtableInsertHintPerBatch specifies the value of "memtable_insert_hint_per_batch".
Expand Down
3 changes: 3 additions & 0 deletions sst_file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "C"

import (
"errors"
"runtime"
"unsafe"
)

Expand Down Expand Up @@ -43,6 +44,8 @@ func (w *SSTFileWriter) Add(key, value []byte) error {
cValue := byteToChar(value)
var cErr *C.char
C.rocksdb_sstfilewriter_add(w.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr)
runtime.KeepAlive(key)
runtime.KeepAlive(value)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand Down
10 changes: 10 additions & 0 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "C"
import (
"errors"
"unsafe"
"runtime"
)

// Transaction is used with TransactionDB for transaction support.
Expand Down Expand Up @@ -56,6 +57,7 @@ func (transaction *Transaction) Get(opts *ReadOptions, key []byte) (*Slice, erro
cValue := C.rocksdb_transaction_get(
transaction.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr,
)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
Expand All @@ -73,6 +75,7 @@ func (transaction *Transaction) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle,
cValue := C.rocksdb_transaction_get_cf(
transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr,
)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
Expand All @@ -90,6 +93,7 @@ func (transaction *Transaction) GetForUpdate(opts *ReadOptions, key []byte) (*Sl
cValue := C.rocksdb_transaction_get_for_update(
transaction.c, opts.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr,
)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
Expand All @@ -107,6 +111,8 @@ func (transaction *Transaction) Put(key, value []byte) error {
C.rocksdb_transaction_put(
transaction.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr,
)
runtime.KeepAlive(key)
runtime.KeepAlive(value)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand All @@ -124,6 +130,8 @@ func (transaction *Transaction) PutCF(cf *ColumnFamilyHandle, key, value []byte)
C.rocksdb_transaction_put_cf(
transaction.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr,
)
runtime.KeepAlive(key)
runtime.KeepAlive(value)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand All @@ -138,6 +146,7 @@ func (transaction *Transaction) Delete(key []byte) error {
cKey = byteToChar(key)
)
C.rocksdb_transaction_delete(transaction.c, cKey, C.size_t(len(key)), &cErr)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand All @@ -152,6 +161,7 @@ func (transaction *Transaction) DeleteCF(cf *ColumnFamilyHandle, key []byte) err
cKey = byteToChar(key)
)
C.rocksdb_transaction_delete_cf(transaction.c, cf.c, cKey, C.size_t(len(key)), &cErr)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
Expand Down
5 changes: 5 additions & 0 deletions transaction_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

package gorocksdb

// #include "rocksdb/c.h"
import "C"

import (
"errors"
"unsafe"
)

import "C"
Expand All @@ -18,6 +22,7 @@ func (transaction *Transaction) GetForUpdateCF(opts *ReadOptions, cf *ColumnFami
cValue := C.rocksdb_transaction_get_for_update_cf(
transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr,
)
runtime.KeepAlive(key)
if cErr != nil {
defer C.rocksdb_free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
Expand Down
Loading

0 comments on commit 8b506a8

Please sign in to comment.