Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Edit to fix close connection in Reader (#94)
Browse files Browse the repository at this point in the history
* 🐛 Edit to Fix close connection in Reader

 - Also implement new unit test to check close func in JS
 - Refactor to implement js_close in ReaderBase

* 💅 Move benchmark file

---------

Co-authored-by: shuse2 <[email protected]>
  • Loading branch information
hrmhatef and shuse2 authored Feb 6, 2023
1 parent af7e9d1 commit 595667b
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 47 deletions.
4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ crate-type = ["cdylib", "lib"]
name = "bench_smt"
path = "benchmark/rust/bench_smt.rs"

[[bench]]
name = "bench_smt"
harness = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/database/reader_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ pub mod read_writer_db;
pub mod reader_base;
pub mod reader_db;

pub use reader_base::ReaderBase;
pub use reader_base::{ReaderBase, SharedReaderBase};
34 changes: 10 additions & 24 deletions src/database/reader_writer/read_writer_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ use neon::result::JsResult;
use neon::types::{buffer::TypedArray, JsBuffer, JsFunction, JsObject, JsTypedArray, JsUndefined};

use crate::database::options;
use crate::database::reader_writer::ReaderBase;
use crate::database::types::{JsBoxRef, Kind, SnapshotMessage};
use crate::database::reader_writer::{ReaderBase, SharedReaderBase};
use crate::database::types::{Kind, SnapshotMessage};
use crate::database::utils::*;
use crate::state_writer;
use crate::types::{ArcMutex, KVPair, SharedKVPair};

pub type SharedReadWriter = JsBoxRef<ReadWriter>;
pub type ReadWriter = ReaderBase;

impl ReadWriter {
/// update or insert the pair of key and value
fn upsert_key(
Expand Down Expand Up @@ -210,10 +208,10 @@ impl ReadWriter {
let key = ctx.argument::<JsTypedArray<u8>>(1)?.as_slice(&ctx).to_vec();
let value = ctx.argument::<JsTypedArray<u8>>(2)?.as_slice(&ctx).to_vec();
let callback = ctx.argument::<JsFunction>(3)?.root(&mut ctx);
// Get the `this` value as a `SharedReaderWriter`
// Get the `this` value as a `SharedReaderBase`
let db = ctx
.this()
.downcast_or_throw::<SharedReadWriter, _>(&mut ctx)?;
.downcast_or_throw::<SharedReaderBase, _>(&mut ctx)?;
let db = db.borrow();

let writer = Arc::clone(&batch.borrow_mut());
Expand All @@ -237,10 +235,10 @@ impl ReadWriter {
.downcast_or_throw::<state_writer::SendableStateWriter, _>(&mut ctx)?;
let key = ctx.argument::<JsTypedArray<u8>>(1)?.as_slice(&ctx).to_vec();
let callback = ctx.argument::<JsFunction>(2)?.root(&mut ctx);
// Get the `this` value as a `SharedReaderWriter`
// Get the `this` value as a `SharedReaderBase`
let db = ctx
.this()
.downcast_or_throw::<SharedReadWriter, _>(&mut ctx)?;
.downcast_or_throw::<SharedReaderBase, _>(&mut ctx)?;
let db = db.borrow_mut();
let writer = Arc::clone(&batch.borrow_mut());
db.get_key_with_writer(callback, writer, key)
Expand All @@ -262,10 +260,10 @@ impl ReadWriter {
.downcast_or_throw::<state_writer::SendableStateWriter, _>(&mut ctx)?;
let key = ctx.argument::<JsTypedArray<u8>>(1)?.as_slice(&ctx).to_vec();
let callback = ctx.argument::<JsFunction>(2)?.root(&mut ctx);
// Get the `this` value as a `SharedReaderWriter`
// Get the `this` value as a `SharedReaderBase`
let db = ctx
.this()
.downcast_or_throw::<SharedReadWriter, _>(&mut ctx)?;
.downcast_or_throw::<SharedReaderBase, _>(&mut ctx)?;
let db = db.borrow_mut();
let writer = Arc::clone(&batch.borrow_mut());
db.delete_key(callback, writer, key)
Expand All @@ -289,27 +287,15 @@ impl ReadWriter {
let option_inputs = ctx.argument::<JsObject>(1)?;
let options = options::IterationOption::new(&mut ctx, option_inputs);
let callback = ctx.argument::<JsFunction>(2)?.root(&mut ctx);
// Get the `this` value as a `SharedReaderWriter`
// Get the `this` value as a `SharedReaderBase`
let db = ctx
.this()
.downcast_or_throw::<SharedReadWriter, _>(&mut ctx)?;
.downcast_or_throw::<SharedReaderBase, _>(&mut ctx)?;
let db = db.borrow_mut();
let writer = Arc::clone(&batch.borrow_mut());
db.range(callback, writer, options)
.or_else(|err| ctx.throw_error(err.to_string()))?;

Ok(ctx.undefined())
}

/// js_close is handler for JS ffi.
/// js "this" - ReadWriter.
pub fn js_close(mut ctx: FunctionContext) -> JsResult<JsUndefined> {
let db = ctx
.this()
.downcast_or_throw::<SharedReadWriter, _>(&mut ctx)?;
let db = db.borrow_mut();
db.close().or_else(|err| ctx.throw_error(err.to_string()))?;

Ok(ctx.undefined())
}
}
28 changes: 21 additions & 7 deletions src/database/reader_writer/reader_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use neon::context::{Context, FunctionContext};
use neon::event::Channel;
use neon::handle::{Handle, Root};
use neon::result::JsResult;
use neon::types::{Finalize, JsBuffer, JsFunction, JsValue};
use neon::types::{Finalize, JsBuffer, JsFunction, JsUndefined, JsValue};

use crate::database::traits::Unwrap;
use crate::database::types::{JsBoxRef, Kind, SnapshotMessage};
Expand All @@ -23,7 +23,14 @@ impl Finalize for ReaderBase {
}
}

pub type SharedReaderBase = JsBoxRef<ReaderBase>;
impl ReaderBase {
/// Idiomatic rust would take an owned `self` to prevent use after close
/// However, it's not possible to prevent JavaScript from continuing to hold a closed database
fn close(&self) -> Result<(), mpsc::SendError<SnapshotMessage>> {
self.tx.send(SnapshotMessage::Close)
}

/// js_new is handler for JS ffi.
/// - @params(0) - StateDB to create the reader from.
/// - @returns - Reader where it is snapshot of stateDB.
Expand Down Expand Up @@ -52,12 +59,6 @@ impl ReaderBase {
Ok(ctx.boxed(RefCell::new(Self { tx })))
}

/// Idiomatic rust would take an owned `self` to prevent use after close
/// However, it's not possible to prevent JavaScript from continuing to hold a closed database
pub fn close(&self) -> Result<(), mpsc::SendError<SnapshotMessage>> {
self.tx.send(SnapshotMessage::Close)
}

pub fn send(
&self,
callback: impl FnOnce(&rocksdb::Snapshot, &Channel) + Send + 'static,
Expand Down Expand Up @@ -92,4 +93,17 @@ impl ReaderBase {
});
})
}

/// js_close is handler for JS ffi.
/// js "this" - ReaderBase.
/// ReaderBase is a base struct so, it is possible to use js_close in Reader & ReadWriter
pub fn js_close(mut ctx: FunctionContext) -> JsResult<JsUndefined> {
let db = ctx
.this()
.downcast_or_throw::<SharedReaderBase, _>(&mut ctx)?;
let db = db.borrow_mut();
db.close().or_else(|err| ctx.throw_error(err.to_string()))?;

Ok(ctx.undefined())
}
}
12 changes: 5 additions & 7 deletions src/database/reader_writer/reader_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ use neon::types::buffer::TypedArray;
use neon::types::{JsBoolean, JsFunction, JsObject, JsTypedArray, JsUndefined, JsValue};

use crate::database::options::IterationOption;
use crate::database::reader_writer::ReaderBase;
use crate::database::types::{JsBoxRef, Kind, SnapshotMessage};
use crate::database::reader_writer::{ReaderBase, SharedReaderBase};
use crate::database::types::{Kind, SnapshotMessage};
use crate::database::utils::*;
use crate::types::KVPair;

pub type Reader = ReaderBase;
pub type SharedReaderDB = JsBoxRef<Reader>;

impl Reader {
fn exists(
&self,
Expand Down Expand Up @@ -60,7 +58,7 @@ impl Reader {
let callback = ctx.argument::<JsFunction>(1)?.root(&mut ctx);
let db = ctx
.this()
.downcast_or_throw::<SharedReaderDB, _>(&mut ctx)?;
.downcast_or_throw::<SharedReaderBase, _>(&mut ctx)?;

let db = db.borrow_mut();
db.get_by_key(key, callback)
Expand All @@ -80,7 +78,7 @@ impl Reader {
let callback = ctx.argument::<JsFunction>(1)?.root(&mut ctx);
let db = ctx
.this()
.downcast_or_throw::<SharedReaderDB, _>(&mut ctx)?;
.downcast_or_throw::<SharedReaderBase, _>(&mut ctx)?;

let db = db.borrow_mut();
db.exists(key, callback)
Expand All @@ -105,7 +103,7 @@ impl Reader {

let db = ctx
.this()
.downcast_or_throw::<SharedReaderDB, _>(&mut ctx)?;
.downcast_or_throw::<SharedReaderBase, _>(&mut ctx)?;
let db = db.borrow();

let callback_on_data = Arc::new(Mutex::new(callback_on_data));
Expand Down
7 changes: 3 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ use state_writer::StateWriter;
#[neon::main]
fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function(
"db_new",
Database::js_new_with_box_ref::<DbOptions, Database>,
)?;
let db_new = Database::js_new_with_box_ref::<DbOptions, Database>;
cx.export_function("db_new", db_new)?;
cx.export_function("db_clear", Database::js_clear)?;
cx.export_function("db_close", Database::js_close)?;
cx.export_function("db_get", Database::js_get)?;
Expand All @@ -47,6 +45,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("db_checkpoint", Database::js_checkpoint)?;

cx.export_function("state_db_reader_new", reader_db::Reader::js_new)?;
cx.export_function("state_db_reader_close", reader_db::Reader::js_close)?;
cx.export_function("state_db_reader_get", reader_db::Reader::js_get)?;
cx.export_function("state_db_reader_exists", reader_db::Reader::js_exists)?;
cx.export_function("state_db_reader_iterate", reader_db::Reader::js_iterate)?;
Expand Down
5 changes: 5 additions & 0 deletions state_db.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {
state_writer_snapshot,
state_writer_restore_snapshot,
state_db_reader_new,
state_db_reader_close,
state_db_reader_get,
state_db_reader_exists,
state_db_reader_iterate,
Expand All @@ -52,6 +53,10 @@ class StateReader {
this._db = state_db_reader_new(db);
}

close() {
state_db_reader_close.call(this._db);
}

async get(key) {
return new Promise((resolve, reject) => {
state_db_reader_get.call(this._db, key, (err, result) => {
Expand Down
7 changes: 7 additions & 0 deletions test/statedb.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,13 @@ describe('statedb', () => {

expect(values).toEqual(initState.slice(1, 3));
});

it('should throw an error when the reader is closed', async () => {
const reader = db.newReader();
await expect(reader.get(initState[0].key)).resolves.toEqual(initState[0].value);
reader.close();
expect(() => reader.get(initState[1].key)).rejects.toThrow();
});
});

describe('checkpoint', () => {
Expand Down
1 change: 1 addition & 0 deletions types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ declare class StateReader {
has(key: Buffer): Promise<boolean>;
iterate(options?: IterateOptions): NodeJS.ReadableStream;
createReadStream(options?: IterateOptions): NodeJS.ReadableStream;
close(): void;
}

declare class StateReadWriter {
Expand Down

0 comments on commit 595667b

Please sign in to comment.