Skip to content

Commit

Permalink
Add support for optimistic transaction db, write batch with index, ke…
Browse files Browse the repository at this point in the history
…yMayExist and empty keys. (#63)

* Add support for optimistic transaction db.

* Add keyMayExist to RocksDbRef.

* Add support for write batch with index.

* Allow empty keys to be used in API.
  • Loading branch information
bhartnett authored Jul 8, 2024
1 parent 6b7de57 commit cf1267e
Show file tree
Hide file tree
Showing 23 changed files with 1,106 additions and 86 deletions.
7 changes: 4 additions & 3 deletions rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

import
./rocksdb/[
backup, columnfamily, rocksdb, rocksiterator, sstfilewriter, transactiondb,
writebatch,
backup, columnfamily, optimistictxdb, rocksdb, rocksiterator, sstfilewriter,
transactiondb, writebatch, writebatchwi,
]

export
backup, columnfamily, rocksdb, rocksiterator, sstfilewriter, transactiondb, writebatch
backup, columnfamily, optimistictxdb, rocksdb, rocksiterator, sstfilewriter,
transactiondb, writebatch, writebatchwi
6 changes: 6 additions & 0 deletions rocksdb/internal/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ template bailOnErrors*(errors: cstring): auto =
let res = err($(errors))
rocksdb_free(errors)
return res

template unsafeAddrOrNil*(s: openArray[byte]): auto =
if s.len > 0:
unsafeAddr s[0]
else:
nil
131 changes: 131 additions & 0 deletions rocksdb/optimistictxdb.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.

## A `OptimisticTxDbRef` can be used to open a connection to the RocksDB database
## with support for transactional operations against multiple column families.
## To create a new transaction call `beginTransaction` which will return a
## `TransactionRef`. To commit or rollback the transaction call `commit` or
## `rollback` on the `TransactionRef` type after applying changes to the transaction.

{.push raises: [].}

import
std/[sequtils, locks],
./lib/librocksdb,
./options/[dbopts, readopts, writeopts],
./transactions/[transaction, otxopts],
./columnfamily/[cfopts, cfdescriptor, cfhandle],
./internal/[cftable, utils],
./rocksresult

export dbopts, cfdescriptor, readopts, writeopts, otxopts, transaction, rocksresult

type
OptimisticTxDbPtr* = ptr rocksdb_optimistictransactiondb_t

OptimisticTxDbRef* = ref object
lock: Lock
cPtr: OptimisticTxDbPtr
path: string
dbOpts: DbOptionsRef
cfDescriptors: seq[ColFamilyDescriptor]
defaultCfHandle: ColFamilyHandleRef
cfTable: ColFamilyTableRef

proc openOptimisticTxDb*(
path: string,
dbOpts = defaultDbOptions(autoClose = true),
columnFamilies: openArray[ColFamilyDescriptor] = [],
): RocksDBResult[OptimisticTxDbRef] =
## Open a `OptimisticTxDbRef` with the given options and column families.
## If no column families are provided the default column family will be used.
## If no options are provided the default options will be used.
## These default options will be closed when the database is closed.
## If any options are provided, they will need to be closed manually.

var cfs = columnFamilies.toSeq()
if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()):
cfs.add(defaultColFamilyDescriptor(autoClose = true))

var
cfNames = cfs.mapIt(it.name().cstring)
cfOpts = cfs.mapIt(it.options.cPtr)
cfHandles = newSeq[ColFamilyHandlePtr](cfs.len)
errors: cstring

let txDbPtr = rocksdb_optimistictransactiondb_open_column_families(
dbOpts.cPtr,
path.cstring,
cfNames.len().cint,
cast[cstringArray](cfNames[0].addr),
cfOpts[0].addr,
cfHandles[0].addr,
cast[cstringArray](errors.addr),
)
bailOnErrorsWithCleanup(errors):
autoCloseNonNil(dbOpts)
autoCloseAll(cfs)

let
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = OptimisticTxDbRef(
lock: createLock(),
cPtr: txDbPtr,
path: path,
dbOpts: dbOpts,
cfDescriptors: cfs,
defaultCfHandle: cfTable.get(DEFAULT_COLUMN_FAMILY_NAME),
cfTable: cfTable,
)
ok(db)

proc getColFamilyHandle*(
db: OptimisticTxDbRef, name: string
): RocksDBResult[ColFamilyHandleRef] =
let cfHandle = db.cfTable.get(name)
if cfHandle.isNil():
err("rocksdb: unknown column family")
else:
ok(cfHandle)

proc isClosed*(db: OptimisticTxDbRef): bool {.inline.} =
## Returns `true` if the `OptimisticTxDbRef` has been closed.
db.cPtr.isNil()

proc beginTransaction*(
db: OptimisticTxDbRef,
readOpts = defaultReadOptions(autoClose = true),
writeOpts = defaultWriteOptions(autoClose = true),
otxOpts = defaultOptimisticTxOptions(autoClose = true),
cfHandle = db.defaultCfHandle,
): TransactionRef =
## Begin a new transaction against the database. The transaction will default
## to using the specified column family. If no column family is specified
## then the default column family will be used.
doAssert not db.isClosed()

let txPtr =
rocksdb_optimistictransaction_begin(db.cPtr, writeOpts.cPtr, otxOpts.cPtr, nil)

newTransaction(txPtr, readOpts, writeOpts, nil, otxOpts, cfHandle)

proc close*(db: OptimisticTxDbRef) =
## Close the `OptimisticTxDbRef`.

withLock(db.lock):
if not db.isClosed():
# the column families should be closed before the database
db.cfTable.close()

rocksdb_optimistictransactiondb_close(db.cPtr)
db.cPtr = nil

# opts should be closed after the database is closed
autoCloseNonNil(db.dbOpts)
autoCloseAll(db.cfDescriptors)
90 changes: 66 additions & 24 deletions rocksdb/rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import
./options/[dbopts, readopts, writeopts],
./columnfamily/[cfopts, cfdescriptor, cfhandle],
./internal/[cftable, utils],
./rocksiterator,
./rocksresult,
./writebatch
./[rocksiterator, rocksresult, writebatch, writebatchwi]

export rocksresult, dbopts, readopts, writeopts, cfdescriptor, rocksiterator, writebatch
export
rocksresult, dbopts, readopts, writeopts, cfdescriptor, cfhandle, rocksiterator,
writebatch, writebatchwi

type
RocksDbPtr* = ptr rocksdb_t
Expand Down Expand Up @@ -236,17 +236,14 @@ proc get*(
## The `onData` callback reduces the number of copies and therefore should be
## preferred if performance is required.

if key.len() == 0:
return err("rocksdb: key is empty")

var
len: csize_t
errors: cstring
let data = rocksdb_get_cf(
db.cPtr,
db.readOpts.cPtr,
cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]),
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
len.addr,
cast[cstringArray](errors.addr),
Expand Down Expand Up @@ -283,38 +280,52 @@ proc put*(
): RocksDBResult[void] =
## Put the value for the given key into the specified column family.

if key.len() == 0:
return err("rocksdb: key is empty")

var errors: cstring
rocksdb_put_cf(
db.cPtr,
db.writeOpts.cPtr,
cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]),
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
cast[cstring](if val.len > 0:
unsafeAddr val[0]
else:
nil
),
cast[cstring](val.unsafeAddrOrNil()),
csize_t(val.len),
cast[cstringArray](errors.addr),
)
bailOnErrors(errors)

ok()

proc keyMayExist*(
db: RocksDbRef, key: openArray[byte], cfHandle = db.defaultCfHandle
): RocksDBResult[bool] =
## If the key definitely does not exist in the database, then this method
## returns false, otherwise it returns true if the key might exist. That is
## to say that this method is probabilistic and may return false positives,
## but never a false negative. This check is potentially lighter-weight than
## invoking keyExists.

let keyMayExist = rocksdb_key_may_exist_cf(
db.cPtr,
db.readOpts.cPtr,
cfHandle.cPtr,
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
nil,
nil,
nil,
0,
nil,
).bool

ok(keyMayExist)

proc keyExists*(
db: RocksDbRef, key: openArray[byte], cfHandle = db.defaultCfHandle
): RocksDBResult[bool] =
## Check if the key exists in the specified column family.
## Returns a result containing `true` if the key exists or a result
## containing `false` otherwise.

# TODO: Call rocksdb_key_may_exist_cf to improve performance for the case
# when the key does not exist

db.get(
key,
proc(data: openArray[byte]) =
Expand All @@ -330,15 +341,12 @@ proc delete*(
## If the value does not exist, the delete will be a no-op.
## To check if the value exists before or after a delete, use `keyExists`.

if key.len() == 0:
return err("rocksdb: key is empty")

var errors: cstring
rocksdb_delete_cf(
db.cPtr,
db.writeOpts.cPtr,
cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]),
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
cast[cstringArray](errors.addr),
)
Expand All @@ -350,6 +358,7 @@ proc openIterator*(
db: RocksDbRef, cfHandle = db.defaultCfHandle
): RocksDBResult[RocksIteratorRef] =
## Opens an `RocksIteratorRef` for the specified column family.
## The iterator should be closed using the `close` method after usage.
doAssert not db.isClosed()

let rocksIterPtr =
Expand All @@ -361,10 +370,31 @@ proc openWriteBatch*(
db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle
): WriteBatchRef =
## Opens a `WriteBatchRef` which defaults to using the specified column family.
## The write batch should be closed using the `close` method after usage.
doAssert not db.isClosed()

createWriteBatch(cfHandle)

proc openWriteBatchWithIndex*(
db: RocksDbReadWriteRef,
reservedBytes = 0,
overwriteKey = false,
cfHandle = db.defaultCfHandle,
): WriteBatchWIRef =
## Opens a `WriteBatchWIRef` which defaults to using the specified column family.
## The write batch should be closed using the `close` method after usage.
## `WriteBatchWIRef` is similar to `WriteBatchRef` but with a binary searchable
## index built for all the keys inserted which allows reading the data which has
## been writen to the batch.
##
## Optionally set the number of bytes to be reserved for the batch by setting
## `reservedBytes`. Set `overwriteKey` to true to overwrite the key in the index
## when inserting a duplicate key, in this way an iterator will never show two
## entries with the same key.
doAssert not db.isClosed()

createWriteBatch(reservedBytes, overwriteKey, db.dbOpts, cfHandle)

proc write*(db: RocksDbReadWriteRef, updates: WriteBatchRef): RocksDBResult[void] =
## Apply the updates in the `WriteBatchRef` to the database.
doAssert not db.isClosed()
Expand All @@ -377,6 +407,18 @@ proc write*(db: RocksDbReadWriteRef, updates: WriteBatchRef): RocksDBResult[void

ok()

proc write*(db: RocksDbReadWriteRef, updates: WriteBatchWIRef): RocksDBResult[void] =
## Apply the updates in the `WriteBatchWIRef` to the database.
doAssert not db.isClosed()

var errors: cstring
rocksdb_write_writebatch_wi(
db.cPtr, db.writeOpts.cPtr, updates.cPtr, cast[cstringArray](errors.addr)
)
bailOnErrors(errors)

ok()

proc ingestExternalFile*(
db: RocksDbReadWriteRef, filePath: string, cfHandle = db.defaultCfHandle
): RocksDBResult[void] =
Expand Down
2 changes: 1 addition & 1 deletion rocksdb/rocksiterator.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ proc seekToKey*(iter: RocksIteratorRef, key: openArray[byte]) =
## invalid.
##
doAssert not iter.isClosed()
rocksdb_iter_seek(iter.cPtr, cast[cstring](unsafeAddr key[0]), csize_t(key.len))
rocksdb_iter_seek(iter.cPtr, cast[cstring](key.unsafeAddrOrNil()), csize_t(key.len))

proc seekToFirst*(iter: RocksIteratorRef) =
## Seeks to the first entry in the column family.
Expand Down
6 changes: 3 additions & 3 deletions rocksdb/sstfilewriter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ proc put*(
var errors: cstring
rocksdb_sstfilewriter_put(
writer.cPtr,
cast[cstring](unsafeAddr key[0]),
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
cast[cstring](unsafeAddr val[0]),
cast[cstring](val.unsafeAddrOrNil()),
csize_t(val.len),
cast[cstringArray](errors.addr),
)
Expand All @@ -77,7 +77,7 @@ proc delete*(writer: SstFileWriterRef, key: openArray[byte]): RocksDBResult[void
var errors: cstring
rocksdb_sstfilewriter_delete(
writer.cPtr,
cast[cstring](unsafeAddr key[0]),
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
cast[cstringArray](errors.addr),
)
Expand Down
4 changes: 1 addition & 3 deletions rocksdb/transactiondb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,11 @@ proc beginTransaction*(
## Begin a new transaction against the database. The transaction will default
## to using the specified column family. If no column family is specified
## then the default column family will be used.
##
##
doAssert not db.isClosed()

let txPtr = rocksdb_transaction_begin(db.cPtr, writeOpts.cPtr, txOpts.cPtr, nil)

newTransaction(txPtr, readOpts, writeOpts, txOpts, cfHandle)
newTransaction(txPtr, readOpts, writeOpts, txOpts, nil, cfHandle)

proc close*(db: TransactionDbRef) =
## Close the `TransactionDbRef`.
Expand Down
Loading

0 comments on commit cf1267e

Please sign in to comment.