Skip to content

Commit

Permalink
Add support for snapshots.
Browse files Browse the repository at this point in the history
  • Loading branch information
bhartnett committed Jul 9, 2024
1 parent cf1267e commit cacd229
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 23 deletions.
8 changes: 7 additions & 1 deletion rocksdb/options/readopts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

{.push raises: [].}

import ../lib/librocksdb
import ../lib/librocksdb, ../snapshot

export snapshot.SnapshotRef, snapshot.isClosed, snapshot.getSequenceNumber

type
ReadOptionsPtr* = ptr rocksdb_readoptions_t
Expand Down Expand Up @@ -51,6 +53,10 @@ opt ignoreRangeDeletions, bool, uint8
opt deadline, int, uint64
opt ioTimeout, int, uint64

proc setSnapshot*(readOpts: ReadOptionsRef, snapshot: SnapshotRef) =
doAssert not readOpts.isClosed()
rocksdb_readoptions_set_snapshot(readOpts.cPtr, snapshot.cPtr)

proc defaultReadOptions*(autoClose = false): ReadOptionsRef {.inline.} =
let readOpts = createReadOptions(autoClose)

Expand Down
36 changes: 30 additions & 6 deletions rocksdb/rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import
./options/[dbopts, readopts, writeopts],
./columnfamily/[cfopts, cfdescriptor, cfhandle],
./internal/[cftable, utils],
./[rocksiterator, rocksresult, writebatch, writebatchwi]
./[rocksiterator, rocksresult, writebatch, writebatchwi, snapshot]

export
rocksresult, dbopts, readopts, writeopts, cfdescriptor, cfhandle, rocksiterator,
writebatch, writebatchwi
writebatch, writebatchwi, snapshot.SnapshotRef, snapshot.isClosed,
snapshot.getSequenceNumber

type
RocksDbPtr* = ptr rocksdb_t
Expand Down Expand Up @@ -355,16 +356,17 @@ proc delete*(
ok()

proc openIterator*(
db: RocksDbRef, cfHandle = db.defaultCfHandle
db: RocksDbRef,
readOpts = defaultReadOptions(autoClose = true),
cfHandle = db.defaultCfHandle,
): RocksDBResult[RocksIteratorRef] =
## Opens an `RocksIteratorRef` for the specified column family.
## The iterator should be closed using the `close` method after usage.
doAssert not db.isClosed()

let rocksIterPtr =
rocksdb_create_iterator_cf(db.cPtr, db.readOpts.cPtr, cfHandle.cPtr)
let rocksIterPtr = rocksdb_create_iterator_cf(db.cPtr, readOpts.cPtr, cfHandle.cPtr)

ok(newRocksIterator(rocksIterPtr))
ok(newRocksIterator(rocksIterPtr, readOpts))

proc openWriteBatch*(
db: RocksDbReadWriteRef, cfHandle = db.defaultCfHandle
Expand Down Expand Up @@ -442,6 +444,28 @@ proc ingestExternalFile*(

ok()

proc getSnapshot*(db: RocksDbRef): RocksDBResult[SnapshotRef] =
## Return a handle to the current DB state. Iterators created with this handle
## will all observe a stable snapshot of the current DB state. The caller must
## call ReleaseSnapshot(result) when the snapshot is no longer needed.
doAssert not db.isClosed()

let sHandle = rocksdb_create_snapshot(db.cPtr)
if sHandle.isNil():
err("rocksdb: failed to create snapshot")
else:
ok(newSnapshot(sHandle, SnapshotType.rocksdb))

proc releaseSnapshot*(db: RocksDbRef, snapshot: SnapshotRef) =
## Release a previously acquired snapshot. The caller must not use "snapshot"
## after this call.
doAssert not db.isClosed()
doAssert snapshot.kind == SnapshotType.rocksdb

if not snapshot.isClosed():
rocksdb_release_snapshot(db.cPtr, snapshot.cPtr)
snapshot.setClosed()

proc close*(db: RocksDbRef) =
## Close the `RocksDbRef` which will release the connection to the database
## and free the memory associated with it. `close` is idempotent and can
Expand Down
11 changes: 8 additions & 3 deletions rocksdb/rocksiterator.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

{.push raises: [].}

import ./lib/librocksdb, ./internal/utils, ./rocksresult
import ./lib/librocksdb, ./internal/utils, ./options/readopts, ./rocksresult

export rocksresult

Expand All @@ -21,10 +21,13 @@ type

RocksIteratorRef* = ref object
cPtr: RocksIteratorPtr
readOpts: ReadOptionsRef

proc newRocksIterator*(cPtr: RocksIteratorPtr): RocksIteratorRef =
proc newRocksIterator*(
cPtr: RocksIteratorPtr, readOpts: ReadOptionsRef
): RocksIteratorRef =
doAssert not cPtr.isNil()
RocksIteratorRef(cPtr: cPtr)
RocksIteratorRef(cPtr: cPtr, readOpts: readOpts)

proc isClosed*(iter: RocksIteratorRef): bool {.inline.} =
## Returns `true` if the iterator is closed and `false` otherwise.
Expand Down Expand Up @@ -128,6 +131,8 @@ proc close*(iter: RocksIteratorRef) =
rocksdb_iter_destroy(iter.cPtr)
iter.cPtr = nil

autoCloseNonNil(iter.readOpts)

iterator pairs*(iter: RocksIteratorRef): tuple[key: seq[byte], value: seq[byte]] =
## Iterates over the key value pairs in the column family yielding them in
## the form of a tuple. The iterator is automatically closed after the
Expand Down
51 changes: 51 additions & 0 deletions rocksdb/snapshot.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.

## A `SnapshotRef` represents a view of the state of the database at some point in time.

{.push raises: [].}

import ./lib/librocksdb

type
SnapshotPtr* = ptr rocksdb_snapshot_t

SnapshotType* = enum
rocksdb
transactiondb

SnapshotRef* = ref object
cPtr: SnapshotPtr
kind: SnapshotType

proc newSnapshot*(cPtr: SnapshotPtr, kind: SnapshotType): SnapshotRef =
doAssert not cPtr.isNil()
SnapshotRef(cPtr: cPtr, kind: kind)

proc isClosed*(snapshot: SnapshotRef): bool {.inline.} =
## Returns `true` if the `SnapshotRef` has been closed and `false` otherwise.
snapshot.cPtr.isNil()

proc cPtr*(snapshot: SnapshotRef): SnapshotPtr =
## Get the underlying database pointer.
doAssert not snapshot.isClosed()
snapshot.cPtr

proc kind*(snapshot: SnapshotRef): SnapshotType =
## Get the kind of the `SnapshotRef`.
snapshot.kind

proc getSequenceNumber*(snapshot: SnapshotRef): uint64 =
## Return the associated sequence number.
doAssert not snapshot.isClosed()
rocksdb_snapshot_get_sequence_number(snapshot.cPtr).uint64

proc setClosed*(snapshot: SnapshotRef) =
# The snapshot should be released from `RocksDbRef` or `TransactionDbRef`
snapshot.cPtr = nil
42 changes: 40 additions & 2 deletions rocksdb/transactiondb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import
./transactions/[transaction, txdbopts, txopts],
./columnfamily/[cfopts, cfdescriptor, cfhandle],
./internal/[cftable, utils],
./rocksresult
./[rocksiterator, rocksresult, snapshot]

export
dbopts, txdbopts, cfdescriptor, readopts, writeopts, txopts, transaction, rocksresult
dbopts, txdbopts, cfdescriptor, readopts, writeopts, txopts, transaction,
rocksiterator, rocksresult, snapshot.SnapshotRef, snapshot.isClosed,
snapshot.getSequenceNumber

type
TransactionDbPtr* = ptr rocksdb_transactiondb_t
Expand Down Expand Up @@ -120,6 +122,42 @@ proc beginTransaction*(

newTransaction(txPtr, readOpts, writeOpts, txOpts, nil, cfHandle)

proc openIterator*(
db: TransactionDbRef,
readOpts = defaultReadOptions(autoClose = true),
cfHandle = db.defaultCfHandle,
): RocksDBResult[RocksIteratorRef] =
## Opens an `RocksIteratorRef` for the specified column family.
## The iterator should be closed using the `close` method after usage.
doAssert not db.isClosed()

let rocksIterPtr =
rocksdb_transactiondb_create_iterator_cf(db.cPtr, readOpts.cPtr, cfHandle.cPtr)

ok(newRocksIterator(rocksIterPtr, readOpts))

proc getSnapshot*(db: TransactionDbRef): RocksDBResult[SnapshotRef] =
## Return a handle to the current DB state. Iterators created with this handle
## will all observe a stable snapshot of the current DB state. The caller must
## call ReleaseSnapshot(result) when the snapshot is no longer needed.
doAssert not db.isClosed()

let sHandle = rocksdb_transactiondb_create_snapshot(db.cPtr)
if sHandle.isNil():
err("rocksdb: failed to create snapshot")
else:
ok(newSnapshot(sHandle, SnapshotType.transactiondb))

proc releaseSnapshot*(db: TransactionDbRef, snapshot: SnapshotRef) =
## Release a previously acquired snapshot. The caller must not use "snapshot"
## after this call.
doAssert not db.isClosed()
doAssert snapshot.kind == SnapshotType.transactiondb

if not snapshot.isClosed():
rocksdb_transactiondb_release_snapshot(db.cPtr, snapshot.cPtr)
snapshot.setClosed()

proc close*(db: TransactionDbRef) =
## Close the `TransactionDbRef`.

Expand Down
2 changes: 1 addition & 1 deletion rocksdb/writebatchwi.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ proc isClosed*(batch: WriteBatchWIRef): bool {.inline.} =
batch.cPtr.isNil()

proc cPtr*(batch: WriteBatchWIRef): WriteBatchWIPtr =
## Get the underlying database pointer.
## Get the underlying write batch pointer.
doAssert not batch.isClosed()
batch.cPtr

Expand Down
35 changes: 35 additions & 0 deletions tests/test_rocksdb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,38 @@ suite "RocksDbRef Tests":

cfOpts.close()
removeDir($dbPath)

test "Create and restore snapshot":
check:
db.put(key, val).isOk()
db.keyExists(key).get() == true
db.keyMayExist(otherKey).get() == false

let snapshot = db.getSnapshot().get()
check:
snapshot.getSequenceNumber() > 0
not snapshot.isClosed()

check:
db.delete(key).isOk()
db.put(otherKey, val).isOk()
db.keyMayExist(key).get() == false
db.keyExists(otherKey).get() == true

let readOpts = defaultReadOptions(autoClose = true)
readOpts.setSnapshot(snapshot)
let iter = db.openIterator(readOpts = readOpts).get()
defer:
iter.close()

iter.seekToKey(key)
check:
iter.isValid() == true
iter.key() == key
iter.value() == val
iter.seekToKey(otherKey)
check:
iter.isValid() == false

db.releaseSnapshot(snapshot)
check snapshot.isClosed()
20 changes: 10 additions & 10 deletions tests/test_rocksiterator.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ suite "RocksIteratorRef Tests":
removeDir($dbPath)

test "Iterate forwards using default column family":
let res = db.openIterator(defaultCfHandle)
let res = db.openIterator(cfHandle = defaultCfHandle)
check res.isOk()

var iter = res.get()
Expand All @@ -72,7 +72,7 @@ suite "RocksIteratorRef Tests":
check expected == byte(4)

test "Iterate backwards using other column family":
let res = db.openIterator(otherCfHandle)
let res = db.openIterator(cfHandle = otherCfHandle)
check res.isOk()

var iter = res.get()
Expand Down Expand Up @@ -106,12 +106,12 @@ suite "RocksIteratorRef Tests":
iter.close()

test "Open two iterators on the same column family":
let res1 = db.openIterator(defaultCfHandle)
let res1 = db.openIterator(cfHandle = defaultCfHandle)
check res1.isOk()
var iter1 = res1.get()
defer:
iter1.close()
let res2 = db.openIterator(defaultCfHandle)
let res2 = db.openIterator(cfHandle = defaultCfHandle)
check res2.isOk()
var iter2 = res2.get()
defer:
Expand All @@ -129,12 +129,12 @@ suite "RocksIteratorRef Tests":
iter2.value() == @[byte(3)]

test "Open two iterators on different column families":
let res1 = db.openIterator(defaultCfHandle)
let res1 = db.openIterator(cfHandle = defaultCfHandle)
check res1.isOk()
var iter1 = res1.get()
defer:
iter1.close()
let res2 = db.openIterator(otherCfHandle)
let res2 = db.openIterator(cfHandle = otherCfHandle)
check res2.isOk()
var iter2 = res2.get()
defer:
Expand All @@ -152,7 +152,7 @@ suite "RocksIteratorRef Tests":
iter2.value() == @[byte(3)]

test "Iterate forwards using seek to key":
let res = db.openIterator(defaultCfHandle)
let res = db.openIterator(cfHandle = defaultCfHandle)
check res.isOk()

var iter = res.get()
Expand Down Expand Up @@ -180,7 +180,7 @@ suite "RocksIteratorRef Tests":
iter.value() == val1

test "Empty column family":
let res = db.openIterator(emptyCfHandle)
let res = db.openIterator(cfHandle = emptyCfHandle)
check res.isOk()
var iter = res.get()
defer:
Expand All @@ -193,7 +193,7 @@ suite "RocksIteratorRef Tests":
check not iter.isValid()

test "Test status":
let res = db.openIterator(emptyCfHandle)
let res = db.openIterator(cfHandle = emptyCfHandle)
check res.isOk()
var iter = res.get()
defer:
Expand All @@ -204,7 +204,7 @@ suite "RocksIteratorRef Tests":
check iter.status().isOk()

test "Test pairs iterator":
let res = db.openIterator(defaultCfHandle)
let res = db.openIterator(cfHandle = defaultCfHandle)
check res.isOk()
var iter = res.get()

Expand Down
Loading

0 comments on commit cacd229

Please sign in to comment.