Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for optimistic transaction db, write batch with index, keyMayExist and empty keys. #63

Merged
merged 5 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

import
./rocksdb/[
backup, columnfamily, rocksdb, rocksiterator, sstfilewriter, transactiondb,
writebatch,
backup, columnfamily, optimistictxdb, rocksdb, rocksiterator, sstfilewriter,
transactiondb, writebatch, writebatchwi,
]

export
backup, columnfamily, rocksdb, rocksiterator, sstfilewriter, transactiondb, writebatch
backup, columnfamily, optimistictxdb, rocksdb, rocksiterator, sstfilewriter,
transactiondb, writebatch, writebatchwi
6 changes: 6 additions & 0 deletions rocksdb/internal/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ template bailOnErrors*(errors: cstring): auto =
let res = err($(errors))
rocksdb_free(errors)
return res

template unsafeAddrOrNil*(s: openArray[byte]): auto =
if s.len > 0:
unsafeAddr s[0]
else:
nil
131 changes: 131 additions & 0 deletions rocksdb/optimistictxdb.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.

## A `OptimisticTxDbRef` can be used to open a connection to the RocksDB database
## with support for transactional operations against multiple column families.
## To create a new transaction call `beginTransaction` which will return a
## `TransactionRef`. To commit or rollback the transaction call `commit` or
## `rollback` on the `TransactionRef` type after applying changes to the transaction.

{.push raises: [].}

import
std/[sequtils, locks],
./lib/librocksdb,
./options/[dbopts, readopts, writeopts],
./transactions/[transaction, otxopts],
./columnfamily/[cfopts, cfdescriptor, cfhandle],
./internal/[cftable, utils],
./rocksresult

export dbopts, cfdescriptor, readopts, writeopts, otxopts, transaction, rocksresult

type
OptimisticTxDbPtr* = ptr rocksdb_optimistictransactiondb_t

OptimisticTxDbRef* = ref object
lock: Lock
cPtr: OptimisticTxDbPtr
path: string
dbOpts: DbOptionsRef
cfDescriptors: seq[ColFamilyDescriptor]
defaultCfHandle: ColFamilyHandleRef
cfTable: ColFamilyTableRef

proc openOptimisticTxDb*(
path: string,
dbOpts = defaultDbOptions(autoClose = true),
columnFamilies: openArray[ColFamilyDescriptor] = [],
): RocksDBResult[OptimisticTxDbRef] =
## Open a `OptimisticTxDbRef` with the given options and column families.
## If no column families are provided the default column family will be used.
## If no options are provided the default options will be used.
## These default options will be closed when the database is closed.
## If any options are provided, they will need to be closed manually.

var cfs = columnFamilies.toSeq()
if DEFAULT_COLUMN_FAMILY_NAME notin columnFamilies.mapIt(it.name()):
cfs.add(defaultColFamilyDescriptor(autoClose = true))

var
cfNames = cfs.mapIt(it.name().cstring)
cfOpts = cfs.mapIt(it.options.cPtr)
cfHandles = newSeq[ColFamilyHandlePtr](cfs.len)
errors: cstring

let txDbPtr = rocksdb_optimistictransactiondb_open_column_families(
dbOpts.cPtr,
path.cstring,
cfNames.len().cint,
cast[cstringArray](cfNames[0].addr),
cfOpts[0].addr,
cfHandles[0].addr,
cast[cstringArray](errors.addr),
)
bailOnErrorsWithCleanup(errors):
autoCloseNonNil(dbOpts)
autoCloseAll(cfs)

let
cfTable = newColFamilyTable(cfNames.mapIt($it), cfHandles)
db = OptimisticTxDbRef(
lock: createLock(),
cPtr: txDbPtr,
path: path,
dbOpts: dbOpts,
cfDescriptors: cfs,
defaultCfHandle: cfTable.get(DEFAULT_COLUMN_FAMILY_NAME),
cfTable: cfTable,
)
ok(db)

proc getColFamilyHandle*(
db: OptimisticTxDbRef, name: string
): RocksDBResult[ColFamilyHandleRef] =
let cfHandle = db.cfTable.get(name)
if cfHandle.isNil():
err("rocksdb: unknown column family")
else:
ok(cfHandle)

proc isClosed*(db: OptimisticTxDbRef): bool {.inline.} =
## Returns `true` if the `OptimisticTxDbRef` has been closed.
db.cPtr.isNil()

proc beginTransaction*(
db: OptimisticTxDbRef,
readOpts = defaultReadOptions(autoClose = true),
writeOpts = defaultWriteOptions(autoClose = true),
otxOpts = defaultOptimisticTxOptions(autoClose = true),
cfHandle = db.defaultCfHandle,
): TransactionRef =
## Begin a new transaction against the database. The transaction will default
## to using the specified column family. If no column family is specified
## then the default column family will be used.
doAssert not db.isClosed()

let txPtr =
rocksdb_optimistictransaction_begin(db.cPtr, writeOpts.cPtr, otxOpts.cPtr, nil)

newTransaction(txPtr, readOpts, writeOpts, nil, otxOpts, cfHandle)

proc close*(db: OptimisticTxDbRef) =
## Close the `OptimisticTxDbRef`.

withLock(db.lock):
if not db.isClosed():
# the column families should be closed before the database
db.cfTable.close()

rocksdb_optimistictransactiondb_close(db.cPtr)
db.cPtr = nil

# opts should be closed after the database is closed
autoCloseNonNil(db.dbOpts)
autoCloseAll(db.cfDescriptors)
90 changes: 66 additions & 24 deletions rocksdb/rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import
./options/[dbopts, readopts, writeopts],
./columnfamily/[cfopts, cfdescriptor, cfhandle],
./internal/[cftable, utils],
./rocksiterator,
./rocksresult,
./writebatch
./[rocksiterator, rocksresult, writebatch, writebatchwi]

export rocksresult, dbopts, readopts, writeopts, cfdescriptor, rocksiterator, writebatch
export
rocksresult, dbopts, readopts, writeopts, cfdescriptor, cfhandle, rocksiterator,
writebatch, writebatchwi

type
RocksDbPtr* = ptr rocksdb_t
Expand Down Expand Up @@ -236,17 +236,14 @@ proc get*(
## The `onData` callback reduces the number of copies and therefore should be
## preferred if performance is required.

if key.len() == 0:
return err("rocksdb: key is empty")

var
len: csize_t
errors: cstring
let data = rocksdb_get_cf(
db.cPtr,
db.readOpts.cPtr,
cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]),
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
len.addr,
cast[cstringArray](errors.addr),
Expand Down Expand Up @@ -283,38 +280,52 @@ proc put*(
): RocksDBResult[void] =
## Put the value for the given key into the specified column family.

if key.len() == 0:
return err("rocksdb: key is empty")

var errors: cstring
rocksdb_put_cf(
db.cPtr,
db.writeOpts.cPtr,
cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]),
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
cast[cstring](if val.len > 0:
unsafeAddr val[0]
else:
nil
),
cast[cstring](val.unsafeAddrOrNil()),
csize_t(val.len),
cast[cstringArray](errors.addr),
)
bailOnErrors(errors)

ok()

proc keyMayExist*(
db: RocksDbRef, key: openArray[byte], cfHandle = db.defaultCfHandle
): RocksDBResult[bool] =
## If the key definitely does not exist in the database, then this method
## returns false, otherwise it returns true if the key might exist. That is
## to say that this method is probabilistic and may return false positives,
## but never a false negative. This check is potentially lighter-weight than
## invoking keyExists.

let keyMayExist = rocksdb_key_may_exist_cf(
db.cPtr,
db.readOpts.cPtr,
cfHandle.cPtr,
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
nil,
nil,
nil,
0,
nil,
).bool

ok(keyMayExist)

proc keyExists*(
db: RocksDbRef, key: openArray[byte], cfHandle = db.defaultCfHandle
): RocksDBResult[bool] =
## Check if the key exists in the specified column family.
## Returns a result containing `true` if the key exists or a result
## containing `false` otherwise.

# TODO: Call rocksdb_key_may_exist_cf to improve performance for the case
# when the key does not exist

db.get(
key,
proc(data: openArray[byte]) =
Expand All @@ -330,15 +341,12 @@ proc delete*(
## If the value does not exist, the delete will be a no-op.
## To check if the value exists before or after a delete, use `keyExists`.

if key.len() == 0:
return err("rocksdb: key is empty")

var errors: cstring
rocksdb_delete_cf(
db.cPtr,
db.writeOpts.cPtr,
cfHandle.cPtr,
cast[cstring](unsafeAddr key[0]),
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
cast[cstringArray](errors.addr),
)
Expand All @@ -350,6 +358,7 @@ proc openIterator*(
db: RocksDbRef, cfHandle = db.defaultCfHandle
): RocksDBResult[RocksIteratorRef] =
## Opens an `RocksIteratorRef` for the specified column family.
## The iterator should be closed using the `close` method after usage.
doAssert not db.isClosed()

let rocksIterPtr =
Expand All @@ -361,10 +370,31 @@ proc openWriteBatch*(
db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle
): WriteBatchRef =
## Opens a `WriteBatchRef` which defaults to using the specified column family.
## The write batch should be closed using the `close` method after usage.
doAssert not db.isClosed()

createWriteBatch(cfHandle)

proc openWriteBatchWithIndex*(
db: RocksDbReadWriteRef,
reservedBytes = 0,
overwriteKey = false,
cfHandle = db.defaultCfHandle,
): WriteBatchWIRef =
## Opens a `WriteBatchWIRef` which defaults to using the specified column family.
## The write batch should be closed using the `close` method after usage.
## `WriteBatchWIRef` is similar to `WriteBatchRef` but with a binary searchable
## index built for all the keys inserted which allows reading the data which has
## been writen to the batch.
##
## Optionally set the number of bytes to be reserved for the batch by setting
## `reservedBytes`. Set `overwriteKey` to true to overwrite the key in the index
## when inserting a duplicate key, in this way an iterator will never show two
## entries with the same key.
doAssert not db.isClosed()

createWriteBatch(reservedBytes, overwriteKey, db.dbOpts, cfHandle)

proc write*(db: RocksDbReadWriteRef, updates: WriteBatchRef): RocksDBResult[void] =
## Apply the updates in the `WriteBatchRef` to the database.
doAssert not db.isClosed()
Expand All @@ -377,6 +407,18 @@ proc write*(db: RocksDbReadWriteRef, updates: WriteBatchRef): RocksDBResult[void

ok()

proc write*(db: RocksDbReadWriteRef, updates: WriteBatchWIRef): RocksDBResult[void] =
## Apply the updates in the `WriteBatchWIRef` to the database.
doAssert not db.isClosed()

var errors: cstring
rocksdb_write_writebatch_wi(
db.cPtr, db.writeOpts.cPtr, updates.cPtr, cast[cstringArray](errors.addr)
)
bailOnErrors(errors)

ok()

proc ingestExternalFile*(
db: RocksDbReadWriteRef, filePath: string, cfHandle = db.defaultCfHandle
): RocksDBResult[void] =
Expand Down
2 changes: 1 addition & 1 deletion rocksdb/rocksiterator.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ proc seekToKey*(iter: RocksIteratorRef, key: openArray[byte]) =
## invalid.
##
doAssert not iter.isClosed()
rocksdb_iter_seek(iter.cPtr, cast[cstring](unsafeAddr key[0]), csize_t(key.len))
rocksdb_iter_seek(iter.cPtr, cast[cstring](key.unsafeAddrOrNil()), csize_t(key.len))

proc seekToFirst*(iter: RocksIteratorRef) =
## Seeks to the first entry in the column family.
Expand Down
6 changes: 3 additions & 3 deletions rocksdb/sstfilewriter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ proc put*(
var errors: cstring
rocksdb_sstfilewriter_put(
writer.cPtr,
cast[cstring](unsafeAddr key[0]),
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
cast[cstring](unsafeAddr val[0]),
cast[cstring](val.unsafeAddrOrNil()),
csize_t(val.len),
cast[cstringArray](errors.addr),
)
Expand All @@ -77,7 +77,7 @@ proc delete*(writer: SstFileWriterRef, key: openArray[byte]): RocksDBResult[void
var errors: cstring
rocksdb_sstfilewriter_delete(
writer.cPtr,
cast[cstring](unsafeAddr key[0]),
cast[cstring](key.unsafeAddrOrNil()),
csize_t(key.len),
cast[cstringArray](errors.addr),
)
Expand Down
4 changes: 1 addition & 3 deletions rocksdb/transactiondb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,11 @@ proc beginTransaction*(
## Begin a new transaction against the database. The transaction will default
## to using the specified column family. If no column family is specified
## then the default column family will be used.
##
##
doAssert not db.isClosed()

let txPtr = rocksdb_transaction_begin(db.cPtr, writeOpts.cPtr, txOpts.cPtr, nil)

newTransaction(txPtr, readOpts, writeOpts, txOpts, cfHandle)
newTransaction(txPtr, readOpts, writeOpts, txOpts, nil, cfHandle)

proc close*(db: TransactionDbRef) =
## Close the `TransactionDbRef`.
Expand Down
Loading