From e005be2d6b4d4f5f46b6c4648b34e37c454da33e Mon Sep 17 00:00:00 2001 From: Farhan Date: Thu, 8 Feb 2024 18:20:10 +0530 Subject: [PATCH] feat: make store async --- .github/workflows/crates.yml | 28 ++ .gitignore | 1 - Cargo.lock | 438 +++++++++++++++++------ Cargo.toml | 28 +- SECURITY.md | 12 + benches/store_bench.rs | 144 +++++--- release.toml | 7 + src/lib.rs | 5 + src/storage/cache/s3fifo.rs | 2 +- src/storage/index/mod.rs | 6 + src/storage/index/node.rs | 2 +- src/storage/kv/entry.rs | 40 +-- src/storage/kv/error.rs | 25 +- src/storage/kv/mod.rs | 2 +- src/storage/kv/oracle.rs | 11 +- src/storage/kv/store.rs | 331 ++++++++++++++++-- src/storage/kv/transaction.rs | 630 ++++++++++++++++++---------------- src/storage/kv/util.rs | 9 + src/storage/log/mod.rs | 12 +- src/storage/mod.rs | 6 +- 20 files changed, 1199 insertions(+), 540 deletions(-) create mode 100644 .github/workflows/crates.yml create mode 100644 SECURITY.md create mode 100644 release.toml diff --git a/.github/workflows/crates.yml b/.github/workflows/crates.yml new file mode 100644 index 00000000..89ad7e37 --- /dev/null +++ b/.github/workflows/crates.yml @@ -0,0 +1,28 @@ +name: Release crate + +run-name: "Release crate" + +on: + workflow_dispatch: + +defaults: + run: + shell: bash + +jobs: + publish-crate: + name: Publish crate + runs-on: ubuntu-latest + steps: + - name: Install stable toolchain + uses: dtolnay/rust-toolchain@stable + with: + toolchain: stable + + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Publish crate + env: + CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} + run: cargo publish diff --git a/.gitignore b/.gitignore index f636b5ba..a976d117 100644 --- a/.gitignore +++ b/.gitignore @@ -41,7 +41,6 @@ Temporary Items /bin/ /docker/ /.direnv/ -/testdata/ # ----------------------------------- # Specific diff --git a/Cargo.lock b/Cargo.lock index 8e0ac5e7..21a75965 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "ahash" version = "0.8.6" @@ -9,6 +24,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" dependencies = [ "cfg-if", + "getrandom", "once_cell", "version_check", "zerocopy", @@ -57,10 +73,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] -name = "async-task" -version = "4.4.0" +name = "async-channel" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" +checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c" +dependencies = [ + "concurrent-queue", + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] [[package]] name = "autocfg" @@ -68,6 +91,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -81,15 +119,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" [[package]] -name = "bitvec" -version = "1.0.1" +name = "block-buffer" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" dependencies = [ - "funty", - "radium", - "tap", - "wyz", + "generic-array", ] [[package]] @@ -104,17 +139,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" -[[package]] -name = "caches" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0c952ebd9e8c333bc49ded7f5c894cdfff85fef27b342b43a2659cc1a338173" -dependencies = [ - "bitvec", - "getrandom", - "rand 0.8.5", -] - [[package]] name = "cast" version = "0.3.0" @@ -202,12 +226,30 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation-sys" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +[[package]] +name = "cpufeatures" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -228,6 +270,7 @@ dependencies = [ "ciborium", "clap", "criterion-plot", + "futures", "is-terminal", "itertools", "num-traits", @@ -240,6 +283,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -320,12 +364,38 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" version = "0.3.8" @@ -337,12 +407,24 @@ dependencies = [ ] [[package]] -name = "fastrand" -version = "1.9.0" +name = "event-listener" +version = "4.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" dependencies = [ - "instant", + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener", + "pin-project-lite", ] [[package]] @@ -352,61 +434,109 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] -name = "flume" -version = "0.11.0" +name = "fuchsia-cprng" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ + "futures-channel", "futures-core", + "futures-executor", + "futures-io", "futures-sink", - "nanorand", - "spin", + "futures-task", + "futures-util", ] [[package]] -name = "fuchsia-cprng" -version = "0.1.1" +name = "futures-channel" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] [[package]] -name = "funty" -version = "2.0.0" +name = "futures-core" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] -name = "futures-core" -version = "0.3.29" +name = "futures-executor" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] [[package]] name = "futures-io" -version = "0.3.29" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] -name = "futures-lite" -version = "1.13.0" +name = "futures-task" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ - "fastrand 1.9.0", + "futures-channel", "futures-core", "futures-io", + "futures-macro", + "futures-sink", + "futures-task", "memchr", - "parking", "pin-project-lite", - "waker-fn", + "pin-utils", + "slab", ] [[package]] -name = "futures-sink" -version = "0.3.29" +name = "generic-array" +version = "0.14.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] [[package]] name = "getrandom" @@ -415,12 +545,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", - "js-sys", "libc", "wasi", - "wasm-bindgen", ] +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + [[package]] name = "half" version = "1.8.2" @@ -466,15 +600,6 @@ dependencies = [ "cc", ] -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if", -] - [[package]] name = "is-terminal" version = "0.4.9" @@ -560,9 +685,9 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "lru" -version = "0.12.0" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60" +checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22" dependencies = [ "hashbrown", ] @@ -583,21 +708,32 @@ dependencies = [ ] [[package]] -name = "nanoid" -version = "0.4.0" +name = "miniz_oxide" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ffa00dec017b5b1a8b7cf5e2c008bfda1aa7e0697ac1508b491fdf2622fb4d8" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" dependencies = [ - "rand 0.8.5", + "adler", ] [[package]] -name = "nanorand" -version = "0.7.0" +name = "mio" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ - "getrandom", + "libc", + "wasi", + "windows-sys 0.48.0", +] + +[[package]] +name = "nanoid" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ffa00dec017b5b1a8b7cf5e2c008bfda1aa7e0697ac1508b491fdf2622fb4d8" +dependencies = [ + "rand 0.8.5", ] [[package]] @@ -609,6 +745,25 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -623,9 +778,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "parking" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" [[package]] name = "parking_lot" @@ -656,6 +811,12 @@ version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "plotters" version = "0.3.5" @@ -699,6 +860,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quick_cache" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58c20af3800cee5134b79a3bd4a3d4b583c16ccfa5f53338f46400851a5b3819" +dependencies = [ + "ahash", + "equivalent", + "hashbrown", + "parking_lot", +] + [[package]] name = "quote" version = "1.0.35" @@ -708,12 +881,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "radium" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" - [[package]] name = "rand" version = "0.4.6" @@ -848,6 +1015,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "rustix" version = "0.38.28" @@ -913,6 +1086,35 @@ dependencies = [ "serde", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.11.1" @@ -920,37 +1122,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] -name = "spin" -version = "0.9.8" +name = "socket2" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ - "lock_api", + "libc", + "windows-sys 0.48.0", ] [[package]] name = "surrealkv" version = "0.1.0" dependencies = [ - "async-task", + "async-channel", "bytes", - "caches", "chrono", "crc32fast", "criterion", "crossbeam", "crossbeam-channel", - "fastrand 2.0.1", - "flume", - "futures-lite", + "fastrand", + "futures", "hashbrown", "jemallocator", "lru", "nanoid", - "once_cell", "parking_lot", + "quick_cache", "rand 0.8.5", + "sha2", "tempdir", + "tokio", ] [[package]] @@ -964,12 +1167,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "tap" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" - [[package]] name = "tempdir" version = "0.3.7" @@ -990,6 +1187,42 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tokio" +version = "1.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "typenum" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" + [[package]] name = "unicode-ident" version = "1.0.11" @@ -1002,12 +1235,6 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "walkdir" version = "2.4.0" @@ -1260,15 +1487,6 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" -[[package]] -name = "wyz" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" -dependencies = [ - "tap", -] - [[package]] name = "zerocopy" version = "0.7.25" diff --git a/Cargo.toml b/Cargo.toml index d7c787da..f59efb4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,17 @@ [package] name = "surrealkv" +publish = true version = "0.1.0" edition = "2021" +license = "Apache-2.0" +readme = "README.md" +description = "A low-level, versioned, embedded, ACID-compliant, key-value database for Rust" +repository = "https://github.com/surrealdb/surrealkv" +homepage = "https://github.com/surrealdb/surrealkv" +documentation = "https://docs.rs/surrealkv/" +keywords = ["lmdb", "rocksdb", "sled", "redb", "tikv"] +categories = ["database-implementations", "concurrency", "data-structures"] +rust-version = "1.75" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -13,23 +23,23 @@ crossbeam-channel = "0.5.8" parking_lot = "0.12.1" hashbrown = "0.14.2" lru = "0.12.0" - -# below is for the async runtime -async-task = "4.4.0" -futures-lite = "1.12.0" -once_cell = "1.17.1" -flume = "0.11.0" +async-channel = "2.1.1" +futures = "0.3.30" bytes = "1.5.0" +tokio = { version = "1.36", features = ["rt"] } +sha2 = "0.10.8" +quick_cache = "0.4.0" [dev-dependencies] +tokio = { version = "1", features = ["full"] } tempdir = "0.3" rand = "0.8.5" -criterion = "0.5.1" +criterion = { version = "0.5.1", features = ["async_tokio", "html_reports"] } jemallocator = "0.5.4" nanoid = "0.4.0" -caches = "0.2" fastrand = "2.0.1" + [[bench]] -name = "s3fifo_bench" +name = "store_bench" harness = false diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 00000000..427aecda --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,12 @@ +# Security Policy + +## Reporting a Vulnerability + +We take the security of SurrealDB code, software, and cloud platform very +seriously. If you believe you have found a security vulnerability in +SurrealDB, we encourage you to let us know right away. We will investigate +all legitimate reports and do our best to quickly fix the problem. + +Please report any issues or vulnerabilities to security@surrealdb.com, +instead of posting a public issue in GitHub. Please include the version +identifier, and details on how the vulnerability can be exploited. diff --git a/benches/store_bench.rs b/benches/store_bench.rs index 956b7747..9eedc618 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -1,8 +1,12 @@ +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; + use criterion::{criterion_group, criterion_main, Criterion}; use jemallocator::Jemalloc; -use surrealkv::storage::kv::option::Options; -use surrealkv::storage::kv::store::Store; +use surrealkv::Options; +use surrealkv::Store; use tempdir::TempDir; #[cfg_attr(any(target_os = "linux", target_os = "macos"), global_allocator)] @@ -13,74 +17,96 @@ fn create_temp_directory() -> TempDir { } fn bulk_insert(c: &mut Criterion) { - let mut count = 0_u32; - let mut bytes = |len| -> Vec { - count += 1; + let count = AtomicU32::new(0_u32); + let bytes = |len| -> Vec { count + .fetch_add(1, Relaxed) .to_be_bytes() .into_iter() .cycle() .take(len) - .clone() .collect() }; let mut bench = |key_len, val_len| { - let mut opts = Options::new(); - opts.dir = create_temp_directory().path().to_path_buf(); - let db = Store::new(opts).expect("should create store"); + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(8) + .enable_all() + .build() + .unwrap(); + + let db = rt.block_on(async { + let mut opts = Options::new(); + opts.dir = create_temp_directory().path().to_path_buf(); + Store::new(opts).expect("should create store") + }); c.bench_function( - &format!("bulk insert key/value lengths {}/{}", key_len, val_len), + &format!("bulk load key/value lengths {}/{}", key_len, val_len), |b| { - b.iter(|| { + b.to_async(&rt).iter(|| async { let mut txn = db.begin().unwrap(); txn.set(bytes(key_len)[..].into(), bytes(val_len)[..].into()) .unwrap(); - txn.commit().unwrap(); + txn.commit().await.unwrap(); }) }, ); + rt.shutdown_background(); }; for key_len in &[10_usize, 128, 256, 512] { for val_len in &[0_usize, 10, 128, 256, 512, 1024, 2048, 4096, 8192] { - bench(*key_len, *val_len) + bench(*key_len, *val_len); } } } fn sequential_insert_read(c: &mut Criterion) { - let mut max_count = 0_u32; - let mut opts = Options::new(); - opts.dir = create_temp_directory().path().to_path_buf(); - let db = Store::new(opts).expect("should create store"); - - c.bench_function("sequential inserts", |b| { - let mut count = 0_u32; - b.iter(|| { - count += 1; - let mut txn = db.begin().unwrap(); - txn.set(count.to_be_bytes()[..].into(), vec![][..].into()) + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(8) + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + let max_count = AtomicU32::new(0_u32); + let mut opts = Options::new(); + opts.dir = create_temp_directory().path().to_path_buf(); + let db = Store::new(opts).expect("should create store"); + + c.bench_function(&format!("sequential inserts"), |b| { + let count = AtomicU32::new(0_u32); + b.iter(|| async { + let mut txn = db.begin().unwrap(); + txn.set( + count.fetch_add(1, Relaxed).to_be_bytes()[..].into(), + vec![][..].into(), + ) .unwrap(); - txn.commit().unwrap(); - if count > max_count { - max_count = count; - } + txn.commit().await.unwrap(); + + let current_count = count.load(Relaxed); + if current_count > max_count.load(Relaxed) { + max_count.store(current_count, Relaxed); + } + }) }); - }); - c.bench_function("sequential gets", |b| { - let mut count = 0_u32; - b.iter(|| { - count += 1; - // not sure why this exceeds the max_count - if count <= max_count { - let txn = db.begin().unwrap(); - txn.get(count.to_be_bytes()[..].into()).unwrap(); - } - }) + c.bench_function(&format!("sequential gets"), |b| { + let count = AtomicU32::new(0_u32); + b.iter(|| async { + count.fetch_add(1, Relaxed); + + let current_count = count.load(Relaxed); + if current_count <= max_count.load(Relaxed) { + let txn = db.begin().unwrap(); + txn.get(¤t_count.to_be_bytes()[..]).unwrap(); + } + }) + }); }); + rt.shutdown_background(); } fn concurrent_insert(c: &mut Criterion) { @@ -90,38 +116,56 @@ fn concurrent_insert(c: &mut Criterion) { group.sample_size(10); group.throughput(criterion::Throughput::Elements(item_count as u64)); - let mut opts = Options::new(); - opts.dir = create_temp_directory().path().to_path_buf(); - let db = Store::new(opts).expect("should create store"); + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(8) + .enable_all() + .build() + .unwrap(); + + let db = rt.block_on(async { + let mut opts = Options::new(); + opts.dir = create_temp_directory().path().to_path_buf(); + opts.max_tx_entries = item_count; + Arc::new(Store::new(opts).expect("should create store")) + }); - for thread_count in [1_u32, 2, 4] { + { + let thread_count = 8_u32; group.bench_function( format!("{} inserts ({} threads)", item_count, thread_count), |b| { b.iter(|| { - let mut threads = vec![]; + let mut handles = vec![]; for _ in 0..thread_count { let db = db.clone(); - threads.push(std::thread::spawn(move || { + let handle = rt.spawn(async move { + let mut txn = db.begin().unwrap(); for _ in 0..(item_count / thread_count) { let key = nanoid::nanoid!(); let value = nanoid::nanoid!(); - let mut txn = db.begin().unwrap(); txn.set(key.as_bytes(), value.as_bytes()).unwrap(); - txn.commit().unwrap(); } - })); + txn.commit().await.unwrap(); + }); + + handles.push(handle); } - for thread in threads { - thread.join().unwrap(); + for handle in handles { + rt.block_on(handle).unwrap(); } }) }, ); } + + rt.block_on(async { + db.close().await.unwrap(); + }); + + rt.shutdown_background(); } criterion_group!(benches_sequential, bulk_insert, sequential_insert_read); diff --git a/release.toml b/release.toml new file mode 100644 index 00000000..26745014 --- /dev/null +++ b/release.toml @@ -0,0 +1,7 @@ +pre-release-commit-message = "Release version" +sign-commit = false +tag-message = "" +tag-prefix = "" +publish = false +push = false +tag = true diff --git a/src/lib.rs b/src/lib.rs index 30f61eb6..78da214a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,6 @@ pub mod storage; + +pub use storage::kv::error::{Error, Result}; +pub use storage::kv::option::Options; +pub use storage::kv::store::Store; +pub use storage::kv::transaction::Transaction; diff --git a/src/storage/cache/s3fifo.rs b/src/storage/cache/s3fifo.rs index 5711c7a8..6985238e 100644 --- a/src/storage/cache/s3fifo.rs +++ b/src/storage/cache/s3fifo.rs @@ -122,7 +122,7 @@ where fn evict(&mut self) { if self.small.len() + self.main.len() >= self.max_cache_size { - if self.main.len() >= self.max_main_size || self.small.len() == 0 { + if self.main.len() >= self.max_main_size || self.small.is_empty() { self.evict_m(); } else { self.evict_s(); diff --git a/src/storage/index/mod.rs b/src/storage/index/mod.rs index 1b7bb707..c58891f2 100644 --- a/src/storage/index/mod.rs +++ b/src/storage/index/mod.rs @@ -278,6 +278,12 @@ pub struct BitSet { bits: [bool; SIZE], } +impl Default for BitSet { + fn default() -> Self { + Self::new() + } +} + impl BitSet { pub fn new() -> Self { Self { diff --git a/src/storage/index/node.rs b/src/storage/index/node.rs index c685fd2e..24662709 100644 --- a/src/storage/index/node.rs +++ b/src/storage/index/node.rs @@ -273,7 +273,7 @@ impl NodeTrait for FlatN let mut new_node = Self::new(self.prefix.clone()); for i in 0..self.num_children as usize { new_node.keys[i] = self.keys[i]; - new_node.children[i] =self.children[i].clone(); + new_node.children[i] = self.children[i].clone(); } new_node.num_children = self.num_children; new_node.version = self.version; diff --git a/src/storage/kv/entry.rs b/src/storage/kv/entry.rs index f9ecd97b..68b2b245 100644 --- a/src/storage/kv/entry.rs +++ b/src/storage/kv/entry.rs @@ -447,7 +447,7 @@ impl ValueRef { /// Otherwise, it reads the value from the commit log, caches it, and returns it. fn resolve_from_offset(&self, value_offset: u64) -> Result> { // Check if the offset exists in value_cache and return if found - if let Some(value) = self.store.value_cache.write().get(&value_offset) { + if let Some(value) = self.store.value_cache.get(&value_offset) { return Ok(value.to_vec()); } @@ -459,8 +459,7 @@ impl ValueRef { // Store the offset and value in value_cache self.store .value_cache - .write() - .push(value_offset, Bytes::from(buf.clone())); + .insert(value_offset, Bytes::from(buf.clone())); Ok(buf) } @@ -469,8 +468,8 @@ impl ValueRef { #[cfg(test)] mod tests { use super::*; + use crate::storage::kv::option::Options; - use crate::storage::kv::store::Core; use crate::storage::kv::store::Store; use tempdir::TempDir; @@ -479,8 +478,8 @@ mod tests { TempDir::new("test").unwrap() } - #[test] - fn encode_decode() { + #[tokio::test] + async fn encode_decode() { // Create a sample valueRef instance // Create a temporary directory for testing let temp_dir = create_temp_directory(); @@ -489,14 +488,15 @@ mod tests { let mut opts = Options::new(); opts.dir = temp_dir.path().to_path_buf(); - let store = Arc::new(Core::new(opts).expect("failed to create store")); + // Create a new Core instance with VariableKey as the key type + let store = Store::new(opts).expect("should create store"); let mut txmd = Metadata::new(); txmd.as_deleted(true).expect("failed to set deleted"); let mut kvmd = Metadata::new(); kvmd.as_deleted(true).expect("failed to set deleted"); - let mut value_ref = ValueRef::new(store); + let mut value_ref = ValueRef::new(store.core.clone()); value_ref.value_length = 100; value_ref.value_offset = Some(200); value_ref.key_value_metadata = Some(kvmd); @@ -518,8 +518,8 @@ mod tests { // ); } - #[test] - fn txn_with_value_read_from_clog() { + #[tokio::test] + async fn txn_with_value_read_from_clog() { // Create a temporary directory for testing let temp_dir = create_temp_directory(); @@ -546,7 +546,7 @@ mod tests { txn.set(&key2, &value).unwrap(); // Commit the transaction - txn.commit().unwrap(); + txn.commit().await.unwrap(); } { @@ -554,7 +554,7 @@ mod tests { let txn = store.begin().unwrap(); // Retrieve the value associated with key1 - let val = txn.get(&key1).unwrap(); + let val = txn.get(&key1).unwrap().unwrap(); // Assert that the value retrieved in txn matches the expected value assert_eq!(&val[..], value.as_ref()); @@ -565,7 +565,7 @@ mod tests { let txn = store.begin().unwrap(); // Retrieve the value associated with key2 - let val = txn.get(&key2).unwrap(); + let val = txn.get(&key2).unwrap().unwrap(); // Assert that the value retrieved in txn matches the expected value assert_eq!(val, value); @@ -579,7 +579,7 @@ mod tests { txn.set(&key3, &value).unwrap(); // Commit the transaction - txn.commit().unwrap(); + txn.commit().await.unwrap(); } { @@ -587,15 +587,15 @@ mod tests { let txn = store.begin().unwrap(); // Retrieve the value associated with key3 - let val = txn.get(&key3).unwrap(); + let val = txn.get(&key3).unwrap().unwrap(); // Assert that the value retrieved in txn matches the expected value assert_eq!(val, value); } } - #[test] - fn txn_with_value_read_from_memory() { + #[tokio::test] + async fn txn_with_value_read_from_memory() { // Create a temporary directory for testing let temp_dir = create_temp_directory(); @@ -621,7 +621,7 @@ mod tests { txn.set(&key2, &value).unwrap(); // Commit the transaction - txn.commit().unwrap(); + txn.commit().await.unwrap(); } { @@ -629,7 +629,7 @@ mod tests { let txn = store.begin().unwrap(); // Retrieve the value associated with key1 - let val = txn.get(&key1).unwrap(); + let val = txn.get(&key1).unwrap().unwrap(); // Assert that the value retrieved in txn matches the expected value assert_eq!(&val[..], value.as_ref()); @@ -640,7 +640,7 @@ mod tests { let txn = store.begin().unwrap(); // Retrieve the value associated with key2 - let val = txn.get(&key2).unwrap(); + let val = txn.get(&key2).unwrap().unwrap(); // Assert that the value retrieved in txn matches the expected value assert_eq!(val, value); diff --git a/src/storage/kv/error.rs b/src/storage/kv/error.rs index 290a979b..1fd2fff2 100644 --- a/src/storage/kv/error.rs +++ b/src/storage/kv/error.rs @@ -1,5 +1,6 @@ use std::{fmt, io, sync::Arc}; +use crate::storage::kv::store::Task; use crate::storage::{index::art::TrieError, log::Error as LogError}; /// Result returning Error @@ -7,7 +8,7 @@ pub type Result = std::result::Result; /// `Error` is a custom error type for the storage module. /// It includes various variants to represent different types of errors that can occur. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum Error { Abort, // The operation was aborted IoError(Arc), // An I/O error occurred @@ -32,6 +33,8 @@ pub enum Error { EmptyValue, // The value in the record is empty ManifestNotFound, // The manifest was not found MaxTransactionEntriesLimitExceeded, // The maximum number of entries in a transaction was exceeded + SendError(String), + ReceiveError(String), } /// Error structure for encoding errors @@ -91,6 +94,8 @@ impl fmt::Display for Error { Error::MaxTransactionEntriesLimitExceeded => { write!(f, "Max transaction entries limit exceeded") } + Error::SendError(err) => write!(f, "Send error: {}", err), + Error::ReceiveError(err) => write!(f, "Receive error: {}", err), } } } @@ -116,3 +121,21 @@ impl From for Error { Error::LogError(log_error) } } + +impl From> for Error { + fn from(error: async_channel::SendError) -> Self { + Error::SendError(format!("Async channel send error: {}", error)) + } +} + +impl From>> for Error { + fn from(error: async_channel::SendError>) -> Self { + Error::SendError(format!("Async channel send error: {}", error)) + } +} + +impl From for Error { + fn from(error: async_channel::RecvError) -> Self { + Error::ReceiveError(format!("Async channel receive error: {}", error)) + } +} diff --git a/src/storage/kv/mod.rs b/src/storage/kv/mod.rs index 56918163..4af000ba 100644 --- a/src/storage/kv/mod.rs +++ b/src/storage/kv/mod.rs @@ -3,7 +3,7 @@ pub mod error; pub(crate) mod indexer; pub(crate) mod meta; pub mod option; -pub mod oracle; +pub(crate) mod oracle; pub(crate) mod reader; pub mod snapshot; pub mod store; diff --git a/src/storage/kv/oracle.rs b/src/storage/kv/oracle.rs index ce692f8c..603e2e92 100644 --- a/src/storage/kv/oracle.rs +++ b/src/storage/kv/oracle.rs @@ -11,6 +11,7 @@ use bytes::Bytes; use crossbeam_channel::{bounded, Receiver, Sender}; use hashbrown::{HashMap, HashSet}; use parking_lot::{Mutex, RwLock}; +use tokio::sync::Mutex as AsyncMutex; use crate::storage::{ index::art::TrieError, @@ -27,7 +28,7 @@ use crate::storage::{ /// It supports two isolation levels: SnapshotIsolation and SerializableSnapshotIsolation. pub(crate) struct Oracle { /// Write lock to ensure that only one transaction can commit at a time. - pub(crate) write_lock: Mutex<()>, + pub(crate) write_lock: AsyncMutex<()>, /// Isolation level of the transactions. isolation: IsolationLevel, } @@ -46,7 +47,7 @@ impl Oracle { }; Self { - write_lock: Mutex::new(()), + write_lock: AsyncMutex::new(()), isolation, } } @@ -159,7 +160,7 @@ impl SnapshotIsolation { /// are still valid in the latest snapshot, and if the timestamp of the read keys matches the timestamp /// of the latest snapshot. If the timestamp does not match, then there is a conflict. pub(crate) fn new_commit_ts(&self, txn: &mut Transaction) -> Result { - let current_snapshot = Snapshot::take(txn.store.clone(), self.read_ts())?; + let current_snapshot = Snapshot::take(txn.core.clone(), self.read_ts())?; let read_set = txn.read_set.lock(); for (key, ts) in read_set.iter() { @@ -333,7 +334,9 @@ impl SerializableSnapshotIsolation { assert!(ts >= commit_tracker.last_cleanup_ts); // Add the transaction to the list of committed transactions with conflict keys. - let conflict_keys = txn.write_set.keys().cloned().collect(); + let conflict_keys: HashSet = + txn.write_set.iter().map(|(key, _)| key.clone()).collect(); + commit_tracker .committed_transactions .push(CommitMarker { ts, conflict_keys }); diff --git a/src/storage/kv/store.rs b/src/storage/kv/store.rs index 42fc7ba0..e2dc3183 100644 --- a/src/storage/kv/store.rs +++ b/src/storage/kv/store.rs @@ -1,16 +1,20 @@ +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::vec; -use std::{num::NonZeroUsize, sync::atomic::AtomicBool}; -use bytes::Bytes; +use async_channel::{bounded, Receiver, Sender}; +use futures::{select, FutureExt}; +use tokio::task::{spawn, JoinHandle}; + +use bytes::{Bytes, BytesMut}; use hashbrown::HashMap; -use lru::LruCache; use parking_lot::RwLock; +use quick_cache::sync::Cache; use crate::storage::{ index::art::KV, kv::{ - entry::{TxRecord, ValueRef}, + entry::{Entry, TxRecord, ValueRef}, error::{Error, Result}, indexer::Indexer, option::Options, @@ -25,9 +29,11 @@ use crate::storage::{ }; /// An MVCC-based transactional key-value store. -#[derive(Clone)] pub struct Store { pub(crate) core: Arc, + pub(crate) is_closed: AtomicBool, + stop_tx: Sender<()>, + task_runner_handle: Arc>>>, } impl Store { @@ -35,8 +41,19 @@ impl Store { /// It creates a new core with the options and wraps it in an atomic reference counter. /// It returns the store. pub fn new(opts: Options) -> Result { - let core = Arc::new(Core::new(opts)?); - Ok(Self { core }) + // TODO: make this channel size configurable + let (writes_tx, writes_rx) = bounded(10000); + let (stop_tx, stop_rx) = bounded(1); + + let core = Arc::new(Core::new(opts, writes_tx)?); + let task_runner_handle = TaskRunner::new(core.clone(), writes_rx, stop_rx).spawn(); + + Ok(Self { + core, + stop_tx, + is_closed: AtomicBool::new(false), + task_runner_handle: Arc::new(RwLock::new(Some(task_runner_handle))), + }) } /// Begins a new read-write transaction. @@ -68,22 +85,84 @@ impl Store { /// Executes a function in a read-write transaction and commits the transaction. /// It begins a new read-write transaction, executes the function with the transaction, and commits the transaction. /// It returns the result of the function. - pub fn write(self: Arc, f: impl FnOnce(&mut Transaction) -> Result<()>) -> Result<()> { + pub async fn write( + self: Arc, + f: impl FnOnce(&mut Transaction) -> Result<()>, + ) -> Result<()> { let mut txn = self.begin_with_mode(Mode::ReadWrite)?; f(&mut txn)?; - txn.commit()?; + txn.commit().await?; Ok(()) } + + /// Closes the store. It sends a stop signal to the writer and waits for the done signal. + pub async fn close(&self) -> Result<()> { + if self.is_closed.load(std::sync::atomic::Ordering::SeqCst) { + return Ok(()); + } + + // Send stop signal + self.stop_tx + .send(()) + .await + .map_err(|e| Error::SendError(format!("{}", e)))?; + + // Wait for task to finish + if let Some(handle) = self.task_runner_handle.write().take() { + handle.await.map_err(|e| { + Error::ReceiveError(format!( + "Error occurred while closing the kv store. JoinError: {}", + e.to_string() + )) + })?; + } + + self.core.close()?; + Ok(()) + } } -impl Drop for Store { - /// Drops the store by closing the core. - /// If closing the core fails, it panics with an error message. - fn drop(&mut self) { - let err = self.core.close(); - if err.is_err() { - panic!("failed to close core: {:?}", err); +pub(crate) struct TaskRunner { + core: Arc, + writes_rx: Receiver, + stop_rx: Receiver<()>, +} + +impl TaskRunner { + fn new(core: Arc, writes_rx: Receiver, stop_rx: Receiver<()>) -> Self { + Self { + core, + writes_rx, + stop_rx, + } + } + + fn spawn(self) -> JoinHandle<()> { + spawn(Box::pin(async move { + loop { + select! { + req = self.writes_rx.recv().fuse() => { + let task = req.unwrap(); + self.handle_task(task).await + }, + _ = self.stop_rx.recv().fuse() => { + // Consume all remaining items in writes_rx + while let Ok(task) = self.writes_rx.try_recv() { + self.handle_task(task).await; + } + drop(self); + return; + }, + } + } + })) + } + + async fn handle_task(&self, task: Task) { + let core = self.core.clone(); + if let Err(err) = core.write_request(task).await { + eprintln!("failed to write: {:?}", err); } } } @@ -97,15 +176,31 @@ pub struct Core { /// Commit log for store. pub(crate) clog: Arc>, /// Manifest for store to track Store state. - pub(crate) manifest: Aol, + pub(crate) manifest: RwLock, /// Transaction ID Oracle for store. pub(crate) oracle: Arc, /// Value cache for store. - /// The assumption for this cache is that it could be useful for + /// The assumption for this cache is that it should be useful for /// storing offsets that are frequently accessed (especially in /// the case of range scans) - pub(crate) value_cache: Arc>>, + pub(crate) value_cache: Cache, + /// Flag to indicate if the store is closed. is_closed: AtomicBool, + /// Channel to send write requests to the writer + writes_tx: Sender, +} + +/// A Task contains multiple entries to be written to the disk. +#[derive(Clone)] +pub struct Task { + /// Entries contained in this task + entries: Vec, + /// Use channel to notify that the value has been persisted to disk + done: Option>>, + /// Transaction ID + tx_id: u64, + /// Commit timestamp + commit_ts: u64, } impl Core { @@ -115,7 +210,7 @@ impl Core { /// opens or creates the commit log file, loads the index from the commit log if it exists, creates /// and initializes an Oracle, creates and initializes a value cache, and constructs and returns /// the Core instance. - pub fn new(opts: Options) -> Result { + pub fn new(opts: Options, writes_tx: Sender) -> Result { // Initialize a new Indexer with the provided options. let mut indexer = Indexer::new(&opts); @@ -147,18 +242,18 @@ impl Core { oracle.set_ts(indexer.version()); // Create and initialize value cache. - let cache_size = NonZeroUsize::new(opts.max_value_cache_size as usize).unwrap(); - let value_cache = Arc::new(RwLock::new(LruCache::new(cache_size))); + let value_cache = Cache::new(opts.max_value_cache_size as usize); // Construct and return the Core instance. Ok(Self { indexer: RwLock::new(indexer), opts, - manifest, + manifest: RwLock::new(manifest), clog: Arc::new(RwLock::new(clog)), oracle: Arc::new(oracle), value_cache, is_closed: AtomicBool::new(false), + writes_tx, }) } @@ -286,7 +381,7 @@ impl Core { self.is_closed.load(std::sync::atomic::Ordering::Relaxed) } - fn close(&self) -> Result<()> { + pub(crate) fn close(&self) -> Result<()> { if self.is_closed() { return Ok(()); } @@ -302,17 +397,109 @@ impl Core { // Close the commit log self.clog.write().close()?; + // Close the manifest + self.manifest.write().close()?; + self.is_closed .store(true, std::sync::atomic::Ordering::Relaxed); Ok(()) } + + pub(crate) async fn write_request(&self, req: Task) -> Result<()> { + let done = req.done.clone(); + + let result = self.write_entries(req); + + if let Some(done) = done { + done.send(result.clone()).await?; + } + + result + } + + fn write_entries(&self, req: Task) -> Result<()> { + if req.entries.is_empty() { + return Ok(()); + } + + let current_offset = self.clog.read().offset()?; + let tx_record = TxRecord::new_with_entries(req.entries.clone(), req.tx_id, req.commit_ts); + let mut buf = BytesMut::new(); + let mut committed_values_offsets = HashMap::new(); + + tx_record.encode(&mut buf, current_offset, &mut committed_values_offsets)?; + + self.append_to_log(&buf)?; + self.write_to_index(&req, &committed_values_offsets)?; + + Ok(()) + } + + fn append_to_log(&self, tx_record: &BytesMut) -> Result<()> { + let mut clog = self.clog.write(); + clog.append(tx_record)?; + + Ok(()) + } + + fn write_to_index( + &self, + req: &Task, + committed_values_offsets: &HashMap, + ) -> Result<()> { + let mut index = self.indexer.write(); + let mut kv_pairs = Vec::new(); + + for entry in &req.entries { + let index_value = ValueRef::encode( + &entry.key, + &entry.value, + entry.metadata.as_ref(), + committed_values_offsets, + self.opts.max_value_threshold, + ); + + kv_pairs.push(KV { + key: entry.key[..].into(), + value: index_value, + version: req.tx_id, + ts: req.commit_ts, + }); + } + + index.bulk_insert(&mut kv_pairs)?; + + Ok(()) + } + + pub(crate) async fn send_to_write_channel( + &self, + entries: Vec, + tx_id: u64, + commit_ts: u64, + ) -> Result>> { + let (tx, rx) = bounded(1); + let req = Task { + entries, + done: Some(tx), + tx_id, + commit_ts, + }; + self.writes_tx.send(req).await?; + Ok(rx) + } } #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::storage::kv::option::Options; - use crate::storage::kv::store::Store; + use crate::storage::kv::store::{Store, Task, TaskRunner}; + + use async_channel::bounded; + use std::sync::atomic::{AtomicU64, Ordering}; use bytes::Bytes; use tempdir::TempDir; @@ -321,8 +508,8 @@ mod tests { TempDir::new("test").unwrap() } - #[test] - fn bulk_insert() { + #[tokio::test] + async fn bulk_insert() { // Create a temporary directory for testing let temp_dir = create_temp_directory(); @@ -360,20 +547,20 @@ mod tests { // Start a new write transaction let mut txn = store.begin().unwrap(); txn.set(key, &default_value).unwrap(); - txn.commit().unwrap(); + txn.commit().await.unwrap(); } // Read the keys to the store for (_, key) in keys.iter().enumerate() { // Start a new read transaction let txn = store.begin().unwrap(); - let val = txn.get(key).unwrap(); + let val = txn.get(key).unwrap().unwrap(); // Assert that the value retrieved in txn3 matches default_value assert_eq!(val, default_value.as_ref()); } // Drop the store to simulate closing it - drop(store); + store.close().await.unwrap(); // Create a new Core instance with VariableKey after dropping the previous one let mut opts = Options::new(); @@ -386,14 +573,14 @@ mod tests { for (_, key) in keys.iter().enumerate() { // Start a new read transaction let txn = store.begin().unwrap(); - let val = txn.get(key).unwrap(); + let val = txn.get(key).unwrap().unwrap(); // Assert that the value retrieved in txn matches default_value assert_eq!(val, default_value.as_ref()); } } - #[test] - fn store_open_and_reload_options() { + #[tokio::test] + async fn store_open_and_reload_options() { // // Create a temporary directory for testing let temp_dir = create_temp_directory(); @@ -403,6 +590,7 @@ mod tests { // Create a new store instance with VariableKey as the key type let store = Store::new(opts.clone()).expect("should create store"); + store.close().await.unwrap(); drop(store); @@ -416,8 +604,8 @@ mod tests { assert_eq!(store_opts, opts); } - #[test] - fn clone_store() { + #[tokio::test] + async fn clone_store() { // Create a temporary directory for testing let temp_dir = create_temp_directory(); @@ -426,7 +614,7 @@ mod tests { opts.dir = temp_dir.path().to_path_buf(); // Create a new store instance with VariableKey as the key type - let store = Store::new(opts).expect("should create store"); + let store = Arc::new(Store::new(opts).expect("should create store")); // Number of keys to generate let num_keys = 100; @@ -456,7 +644,76 @@ mod tests { // Start a new write transaction let mut txn = store1.begin().unwrap(); txn.set(key, &default_value).unwrap(); - txn.commit().unwrap(); + txn.commit().await.unwrap(); + } + } + + #[tokio::test] + async fn stop_task_runner() { + // Create a temporary directory for testing + let temp_dir = create_temp_directory(); + + // Create store options with the test directory + let mut opts = Options::new(); + opts.dir = temp_dir.path().to_path_buf(); + + // Create a new store instance with VariableKey as the key type + let store = Store::new(opts).expect("should create store"); + + let (writes_tx, writes_rx) = bounded(100); + let (stop_tx, stop_rx) = bounded(1); + let core = &store.core; + + let runner = TaskRunner::new(core.clone(), writes_rx, stop_rx); + let fut = runner.spawn(); + + // Send some tasks + let task_counter = Arc::new(AtomicU64::new(0)); + for i in 0..100 { + let (done_tx, done_rx) = bounded(1); + writes_tx + .send(Task { + entries: vec![], + done: Some(done_tx), + tx_id: i, + commit_ts: i, + }) + .await + .unwrap(); + + let task_counter = Arc::clone(&task_counter); + tokio::spawn(async move { + done_rx.recv().await.unwrap().unwrap(); + task_counter.fetch_add(1, Ordering::SeqCst); + }); } + + // Send stop signal + stop_tx.send(()).await.unwrap(); + + // Wait for a while to let TaskRunner handle all tasks by waiting on done_rx + fut.await.expect("TaskRunner should finish"); + + // Wait for the spawned tokio thread to finish + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + + // Check if all tasks were handled + assert_eq!(task_counter.load(Ordering::SeqCst), 100); + } + + async fn concurrent_task(store: Arc) { + let mut txn = store.begin().unwrap(); + txn.set(b"dummy key", b"dummy value").unwrap(); + txn.commit().await.unwrap(); + } + + #[tokio::test] + async fn concurrent_test() { + let mut opts = Options::new(); + opts.dir = create_temp_directory().path().to_path_buf(); + let db = Arc::new(Store::new(opts).expect("should create store")); + let task1 = tokio::spawn(concurrent_task(db.clone())); + let task2 = tokio::spawn(concurrent_task(db.clone())); + let _ = tokio::try_join!(task1, task2).expect("Tasks failed"); } } diff --git a/src/storage/kv/transaction.rs b/src/storage/kv/transaction.rs index 527b7c07..a895634c 100644 --- a/src/storage/kv/transaction.rs +++ b/src/storage/kv/transaction.rs @@ -6,13 +6,13 @@ use hashbrown::HashMap; use parking_lot::{Mutex, RwLock}; use crate::storage::{ - index::{art::TrieError, art::KV, VariableKey}, + index::{art::TrieError, VariableKey}, kv::{ - entry::{Entry, TxRecord, Value, ValueRef}, + entry::{Entry, Value, ValueRef}, error::{Error, Result}, snapshot::{FilterFn, Snapshot, FILTERS}, store::Core, - util::now, + util::{now, sha256}, }, }; @@ -74,11 +74,15 @@ pub struct Transaction { /// `buf` is a reusable buffer for encoding transaction records. This is used to reduce memory allocations. buf: BytesMut, - /// `store` is the underlying store for the transaction. This is shared between transactions. - pub(crate) store: Arc, + /// `core` is the underlying core for the transaction. This is shared between transactions. + pub(crate) core: Arc, - /// `write_set` is the pending writes for the transaction. These are the changes that the transaction wants to make to the data. - pub(crate) write_set: HashMap, + /// `write_order_map` is a mapping from sha256 of keys to their order in the write_set. + pub(crate) write_order_map: HashMap, + + /// `write_set` is a vector of tuples, where each tuple contains a key and its corresponding entry. + /// These are the changes that the transaction intends to make to the data. + pub(crate) write_set: Vec<(Bytes, Entry)>, /// `read_set` is the keys that are read in the transaction from the snapshot. This is used for conflict detection. pub(crate) read_set: Mutex>, @@ -92,17 +96,18 @@ pub struct Transaction { impl Transaction { /// Prepare a new transaction in the given mode. - pub fn new(store: Arc, mode: Mode) -> Result { - let snapshot = RwLock::new(Snapshot::take(store.clone(), now())?); - let read_ts = store.read_ts()?; + pub fn new(core: Arc, mode: Mode) -> Result { + let snapshot = RwLock::new(Snapshot::take(core.clone(), now())?); + let read_ts = core.read_ts()?; Ok(Self { read_ts, mode, snapshot, buf: BytesMut::new(), - store, - write_set: HashMap::new(), + core, + write_order_map: HashMap::new(), + write_set: Vec::new(), read_set: Mutex::new(Vec::new()), committed_values_offsets: HashMap::new(), closed: false, @@ -131,7 +136,7 @@ impl Transaction { } /// Gets a value for a key if it exists. - pub fn get(&self, key: &[u8]) -> Result> { + pub fn get(&self, key: &[u8]) -> Result>> { // If the transaction is closed, return an error. if self.closed { return Err(Error::TransactionClosed); @@ -143,13 +148,17 @@ impl Transaction { // Create a copy of the key. let key = Bytes::copy_from_slice(key); + let hashed_key = sha256(key.clone()); // Attempt to get the value for the key from the snapshot. match self.snapshot.read().get(&key[..].into()) { Ok(val_ref) => { // RYOW semantics: Read your own write. If the key is in the write set, return the value. - if let Some(entry) = self.write_set.get(&key) { - return Ok(entry.value.clone().to_vec()); + // Check if the key is in the write set by checking in the write_order_map map. + if let Some(order) = self.write_order_map.get(&hashed_key) { + if let Some(entry) = self.write_set.get(*order as usize) { + return Ok(Some(entry.1.value.clone().to_vec())); + } } // If the transaction is not read-only and the value reference has a timestamp greater than 0, @@ -159,7 +168,7 @@ impl Transaction { } // Resolve the value reference to get the actual value. - val_ref.resolve() + val_ref.resolve().map(Some) } Err(e) => { match &e { @@ -171,7 +180,7 @@ impl Transaction { self.read_set.lock().push((key, 0)); } } - Err(e) + Ok(None) } // For other errors, just propagate them. _ => Err(e), @@ -195,15 +204,15 @@ impl Transaction { return Err(Error::EmptyKey); } // If the key length exceeds the maximum allowed key size, return an error. - if e.key.len() as u64 > self.store.opts.max_key_size { + if e.key.len() as u64 > self.core.opts.max_key_size { return Err(Error::MaxKeyLengthExceeded); } // If the value length exceeds the maximum allowed value size, return an error. - if e.value.len() as u64 > self.store.opts.max_value_size { + if e.value.len() as u64 > self.core.opts.max_value_size { return Err(Error::MaxValueLengthExceeded); } - if self.write_set.len() as u32 >= self.store.opts.max_tx_entries { + if self.write_set.len() as u32 >= self.core.opts.max_tx_entries { return Err(Error::MaxTransactionEntriesLimitExceeded); } @@ -222,13 +231,26 @@ impl Transaction { } // Add the entry to the set of pending writes. - self.write_set.insert(e.key.clone(), e); + let hashed_key = sha256(e.key.clone()); + + // Check if the key already exists in write_order_map, if so, update the entry in write_set. + if let Some(order) = self.write_order_map.get(&hashed_key) { + self.write_set[*order as usize] = (e.key.clone(), e); + } else { + self.write_set.push((e.key.clone(), e)); + self.write_order_map + .insert(hashed_key, self.write_order_map.len() as u32); + } Ok(()) } /// Scans a range of keys and returns a vector of tuples containing the value, version, and timestamp for each key. - pub fn scan<'b, R>(&'b self, range: R) -> Result, u64, u64)>> + pub fn scan<'b, R>( + &'b self, + range: R, + limit: Option, + ) -> Result, Vec, u64, u64)>> where R: RangeBounds<&'b [u8]>, { @@ -254,19 +276,30 @@ impl Transaction { }, ); + // Initialize an empty vector to store the results. + let mut results = Vec::new(); + // Create a new reader for the snapshot. - let iterator = self.snapshot.write().new_reader()?; + let iterator = match self.snapshot.write().new_reader() { + Ok(reader) => reader, + Err(Error::IndexError(TrieError::SnapshotEmpty)) => return Ok(Vec::new()), + Err(e) => return Err(e), + }; // Get a range iterator for the specified range. let ranger = iterator.range(range); - // Initialize an empty vector to store the results. - let mut results = Vec::new(); - // Iterate over the keys in the range. 'outer: for (key, value, version, ts) in ranger { + // If a limit is set and we've already got enough results, break the loop. + if let Some(limit) = limit { + if results.len() >= limit { + break; + } + } + // Create a new value reference and decode the value. - let mut val_ref = ValueRef::new(self.store.clone()); + let mut val_ref = ValueRef::new(self.core.clone()); let val_bytes_ref: &Bytes = value; val_ref.decode(*version, val_bytes_ref)?; @@ -290,7 +323,9 @@ impl Transaction { let v = val_ref.resolve()?; // Add the value, version, and timestamp to the results vector. - results.push((v, *version, *ts)); + let mut key = key; + key.truncate(key.len() - 1); + results.push((key, v, *version, *ts)); } // Return the results. @@ -298,7 +333,7 @@ impl Transaction { } /// Commits the transaction, by writing all pending entries to the store. - pub fn commit(&mut self) -> Result<()> { + pub async fn commit(&mut self) -> Result<()> { // If the transaction is closed, return an error. if self.closed { return Err(Error::TransactionClosed); @@ -314,19 +349,36 @@ impl Transaction { return Ok(()); } - // TODO: Use a commit pipeline to avoid blocking calls. // Lock the oracle to serialize commits to the transaction log. - let oracle = self.store.oracle.clone(); - let _lock = oracle.write_lock.lock(); + let oracle = self.core.oracle.clone(); + let write_ch_lock = oracle.write_lock.lock().await; // Prepare for the commit by getting a transaction ID and a commit timestamp. let (tx_id, commit_ts) = self.prepare_commit()?; - // Add transaction records to the transaction log. - self.add_to_transaction_log(tx_id, commit_ts)?; + // Sort the keys in the write set and create a vector of entries. + let entries: Vec = self + .write_set + .iter() + .map(|(_, entry)| entry.clone()) + .collect(); // Commit the changes to the store index. - self.commit_to_index(tx_id, commit_ts)?; + let done = self + .core + .send_to_write_channel(entries, tx_id, commit_ts) + .await; + + if let Err(err) = done { + oracle.committed_upto(commit_ts); + return Err(err); + } + + drop(write_ch_lock); + + // Check if the transaction is written to the transaction log. + let done = done.unwrap(); + let ret = done.recv().await?; // Update the oracle to indicate that the transaction has been committed up to the given transaction ID. oracle.committed_upto(tx_id); @@ -334,12 +386,13 @@ impl Transaction { // Mark the transaction as closed. self.closed = true; - Ok(()) + // Ok(()) + ret } /// Prepares for the commit by assigning commit timestamps and preparing records. fn prepare_commit(&mut self) -> Result<(u64, u64)> { - let oracle = self.store.oracle.clone(); + let oracle = self.core.oracle.clone(); let tx_id = oracle.new_commit_ts(self)?; let commit_ts = self.assign_commit_ts(); Ok((tx_id, commit_ts)) @@ -354,61 +407,6 @@ impl Transaction { commit_ts } - /// Adds transaction records to the transaction log. - fn add_to_transaction_log(&mut self, tx_id: u64, commit_ts: u64) -> Result { - let current_offset = self.store.clog.read().offset()?; - let entries: Vec = self.write_set.values().cloned().collect(); - let tx_record = TxRecord::new_with_entries(entries, tx_id, commit_ts); - tx_record.encode( - &mut self.buf, - current_offset, - &mut self.committed_values_offsets, - )?; - - let mut clog = self.store.clog.write(); - let (tx_offset, _) = clog.append(self.buf.as_ref())?; - Ok(tx_offset) - } - - /// Commits transaction changes to the store index. - fn commit_to_index(&mut self, tx_id: u64, commit_ts: u64) -> Result<()> { - let mut index = self.store.indexer.write(); - let mut kv_pairs = self.build_kv_pairs(tx_id, commit_ts); - - index.bulk_insert(&mut kv_pairs)?; - Ok(()) - } - - /// Builds key-value pairs from the write set. - fn build_kv_pairs(&self, tx_id: u64, commit_ts: u64) -> Vec> { - let mut kv_pairs = Vec::new(); - - for (_, entry) in self.write_set.iter() { - let index_value = self.build_index_value(entry); - - kv_pairs.push(KV { - key: entry.key[..].into(), - value: index_value, - version: tx_id, - ts: commit_ts, - }); - } - - kv_pairs - } - - /// Builds an index value from an entry. - fn build_index_value(&self, entry: &Entry) -> Bytes { - let index_value = ValueRef::encode( - &entry.key, - &entry.value, - entry.metadata.as_ref(), - &self.committed_values_offsets, - self.store.opts.max_value_threshold, - ); - index_value - } - /// Rolls back the transaction by removing all updated entries. pub fn rollback(&mut self) { self.closed = true; @@ -454,8 +452,8 @@ mod tests { ) } - #[test] - fn basic_transaction() { + #[tokio::test] + async fn basic_transaction() { let (store, temp_dir) = create_store(false); // Define key-value pairs for the test @@ -469,13 +467,13 @@ mod tests { let mut txn1 = store.begin().unwrap(); txn1.set(&key1, &value1).unwrap(); txn1.set(&key2, &value1).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); } { // Start a read-only transaction (txn3) let txn3 = store.begin().unwrap(); - let val = txn3.get(&key1).unwrap(); + let val = txn3.get(&key1).unwrap().unwrap(); assert_eq!(val, value1.as_ref()); } @@ -484,11 +482,11 @@ mod tests { let mut txn2 = store.begin().unwrap(); txn2.set(&key1, &value2).unwrap(); txn2.set(&key2, &value2).unwrap(); - txn2.commit().unwrap(); + txn2.commit().await.unwrap(); } // Drop the store to simulate closing it - drop(store); + store.close().await.unwrap(); // Create a new Core instance with VariableKey after dropping the previous one let mut opts = Options::new(); @@ -497,14 +495,14 @@ mod tests { // Start a read-only transaction (txn4) let txn4 = store.begin().unwrap(); - let val = txn4.get(&key1).unwrap(); + let val = txn4.get(&key1).unwrap().unwrap(); // Assert that the value retrieved in txn4 matches value2 assert_eq!(val, value2.as_ref()); } - #[test] - fn transaction_delete_scan() { + #[tokio::test] + async fn transaction_delete_scan() { let (store, _) = create_store(false); // Define key-value pairs for the test @@ -516,31 +514,31 @@ mod tests { let mut txn1 = store.begin().unwrap(); txn1.set(&key1, &value1).unwrap(); txn1.set(&key1, &value1).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); } { // Start a read-only transaction (txn) let mut txn = store.begin().unwrap(); txn.delete(&key1).unwrap(); - txn.commit().unwrap(); + txn.commit().await.unwrap(); } { // Start another read-write transaction (txn) let txn = store.begin().unwrap(); - assert!(txn.get(&key1).is_err()); + assert!(txn.get(&key1).unwrap().is_none()); } { let range = "k1".as_bytes()..="k3".as_bytes(); let txn = store.begin().unwrap(); - let results = txn.scan(range).unwrap(); + let results = txn.scan(range, None).unwrap(); assert_eq!(results.len(), 0); } } - fn mvcc_tests(is_ssi: bool) { + async fn mvcc_tests(is_ssi: bool) { let (store, _) = create_store(is_ssi); let key1 = Bytes::from("key1"); @@ -554,11 +552,11 @@ mod tests { let mut txn2 = store.begin().unwrap(); txn1.set(&key1, &value1).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); - assert!(txn2.get(&key2).is_err()); + assert!(txn2.get(&key2).unwrap().is_none()); txn2.set(&key2, &value2).unwrap(); - txn2.commit().unwrap(); + txn2.commit().await.unwrap(); } // read conflict when the read key was updated by another transaction @@ -567,11 +565,11 @@ mod tests { let mut txn2 = store.begin().unwrap(); txn1.set(&key1, &value1).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); assert!(txn2.get(&key1).is_ok()); txn2.set(&key1, &value2).unwrap(); - assert!(match txn2.commit() { + assert!(match txn2.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -591,11 +589,11 @@ mod tests { txn1.set(&key1, &value1).unwrap(); txn2.set(&key1, &value2).unwrap(); - txn1.commit().unwrap(); - txn2.commit().unwrap(); + txn1.commit().await.unwrap(); + txn2.commit().await.unwrap(); let txn3 = store.begin().unwrap(); - let val = txn3.get(&key1).unwrap(); + let val = txn3.get(&key1).unwrap().unwrap(); assert_eq!(val, value2.as_ref()); } @@ -607,11 +605,11 @@ mod tests { let mut txn2 = store.begin().unwrap(); txn1.set(&key, &value1).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); - assert!(txn2.get(&key).is_err()); + assert!(txn2.get(&key).unwrap().is_none()); txn2.set(&key, &value1).unwrap(); - assert!(match txn2.commit() { + assert!(match txn2.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -629,17 +627,17 @@ mod tests { let mut txn1 = store.begin().unwrap(); txn1.set(&key, &value1).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); let mut txn2 = store.begin().unwrap(); let mut txn3 = store.begin().unwrap(); txn2.delete(&key).unwrap(); - assert!(txn2.commit().is_ok()); + assert!(txn2.commit().await.is_ok()); assert!(txn3.get(&key).is_ok()); txn3.set(&key, &value2).unwrap(); - assert!(match txn3.commit() { + assert!(match txn3.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -652,18 +650,18 @@ mod tests { } } - #[test] - fn mvcc_serialized_snapshot_isolation() { - mvcc_tests(true); + #[tokio::test] + async fn mvcc_serialized_snapshot_isolation() { + mvcc_tests(true).await; } - #[test] - fn mvcc_snapshot_isolation() { - mvcc_tests(false); + #[tokio::test] + async fn mvcc_snapshot_isolation() { + mvcc_tests(false).await; } - #[test] - fn basic_scan_single_key() { + #[tokio::test] + async fn basic_scan_single_key() { let (store, _) = create_store(false); // Define key-value pairs for the test let keys_to_insert = vec![Bytes::from("key1")]; @@ -671,18 +669,18 @@ mod tests { for key in &keys_to_insert { let mut txn = store.begin().unwrap(); txn.set(key, key).unwrap(); - txn.commit().unwrap(); + txn.commit().await.unwrap(); } let range = "key1".as_bytes()..="key3".as_bytes(); let txn = store.begin().unwrap(); - let results = txn.scan(range).unwrap(); + let results = txn.scan(range, None).unwrap(); assert_eq!(results.len(), keys_to_insert.len()); } - #[test] - fn basic_scan_multiple_keys() { + #[tokio::test] + async fn basic_scan_multiple_keys() { let (store, _) = create_store(false); // Define key-value pairs for the test let keys_to_insert = vec![ @@ -695,20 +693,49 @@ mod tests { for key in &keys_to_insert { let mut txn = store.begin().unwrap(); txn.set(key, key).unwrap(); - txn.commit().unwrap(); + txn.commit().await.unwrap(); } let range = "key1".as_bytes()..="key3".as_bytes(); let txn = store.begin().unwrap(); - let results = txn.scan(range).unwrap(); + let results = txn.scan(range, None).unwrap(); + assert_eq!(results.len(), 3); + assert_eq!(results[0].1, keys_to_insert[0]); + assert_eq!(results[1].1, keys_to_insert[1]); + assert_eq!(results[2].1, keys_to_insert[2]); + } + + #[tokio::test] + async fn scan_multiple_keys_within_single_transaction() { + let (store, _) = create_store(false); + // Define key-value pairs for the test + let keys_to_insert = vec![ + Bytes::from("test1"), + Bytes::from("test2"), + Bytes::from("test3"), + ]; + + let mut txn = store.begin().unwrap(); + for key in &keys_to_insert { + txn.set(key, key).unwrap(); + } + txn.commit().await.unwrap(); + + let range = "test1".as_bytes()..="test7".as_bytes(); + + let txn = store.begin().unwrap(); + let results = txn.scan(range, None).unwrap(); assert_eq!(results.len(), 3); assert_eq!(results[0].0, keys_to_insert[0]); assert_eq!(results[1].0, keys_to_insert[1]); assert_eq!(results[2].0, keys_to_insert[2]); + assert_eq!(results[0].1, keys_to_insert[0]); + assert_eq!(results[1].1, keys_to_insert[1]); + assert_eq!(results[2].1, keys_to_insert[2]); } - fn mvcc_with_scan_tests(is_ssi: bool) { + async fn mvcc_with_scan_tests(is_ssi: bool) { let (store, _) = create_store(is_ssi); let key1 = Bytes::from("key1"); @@ -727,7 +754,7 @@ mod tests { let mut txn1 = store.begin().unwrap(); txn1.set(&key1, &value1).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); let mut txn2 = store.begin().unwrap(); let mut txn3 = store.begin().unwrap(); @@ -735,15 +762,15 @@ mod tests { txn2.set(&key1, &value4).unwrap(); txn2.set(&key2, &value2).unwrap(); txn2.set(&key3, &value3).unwrap(); - txn2.commit().unwrap(); + txn2.commit().await.unwrap(); let range = "key1".as_bytes()..="key4".as_bytes(); - let results = txn3.scan(range).unwrap(); + let results = txn3.scan(range, None).unwrap(); assert_eq!(results.len(), 1); txn3.set(&key2, &value5).unwrap(); txn3.set(&key3, &value6).unwrap(); - assert!(match txn3.commit() { + assert!(match txn3.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -760,19 +787,19 @@ mod tests { let mut txn1 = store.begin().unwrap(); txn1.set(&key4, &value1).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); let mut txn2 = store.begin().unwrap(); let mut txn3 = store.begin().unwrap(); txn2.delete(&key4).unwrap(); - txn2.commit().unwrap(); + txn2.commit().await.unwrap(); let range = "key1".as_bytes()..="key5".as_bytes(); - txn3.scan(range).unwrap(); + txn3.scan(range, None).unwrap(); txn3.set(&key4, &value2).unwrap(); - assert!(match txn3.commit() { + assert!(match txn3.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -785,18 +812,18 @@ mod tests { } } - #[test] - fn mvcc_serialized_snapshot_isolation_scan() { - mvcc_with_scan_tests(true); + #[tokio::test] + async fn mvcc_serialized_snapshot_isolation_scan() { + mvcc_with_scan_tests(true).await; } - #[test] - fn mvcc_snapshot_isolation_scan() { - mvcc_with_scan_tests(false); + #[tokio::test] + async fn mvcc_snapshot_isolation_scan() { + mvcc_with_scan_tests(false).await; } - #[test] - fn ryow() { + #[tokio::test] + async fn ryow() { let temp_dir = create_temp_directory(); let mut opts = Options::new(); opts.dir = temp_dir.path().to_path_buf(); @@ -811,23 +838,23 @@ mod tests { { let mut txn = store.begin().unwrap(); txn.set(&key1, &value1).unwrap(); - txn.commit().unwrap(); + txn.commit().await.unwrap(); } { // Start a new read-write transaction (txn) let mut txn = store.begin().unwrap(); txn.set(&key1, &value2).unwrap(); - assert_eq!(txn.get(&key1).unwrap(), value2.as_ref()); - assert!(txn.get(&key3).is_err()); + assert_eq!(txn.get(&key1).unwrap().unwrap(), value2.as_ref()); + assert!(txn.get(&key3).unwrap().is_none()); txn.set(&key2, &value1).unwrap(); - assert_eq!(txn.get(&key2).unwrap(), value1.as_ref()); - txn.commit().unwrap(); + assert_eq!(txn.get(&key2).unwrap().unwrap(), value1.as_ref()); + txn.commit().await.unwrap(); } } // Common setup logic for creating a store - fn create_hermitage_store(is_ssi: bool) -> Store { + async fn create_hermitage_store(is_ssi: bool) -> Store { let (store, _) = create_store(is_ssi); let key1 = Bytes::from("k1"); @@ -838,7 +865,7 @@ mod tests { let mut txn = store.begin().unwrap(); txn.set(&key1, &value1).unwrap(); txn.set(&key2, &value2).unwrap(); - txn.commit().unwrap(); + txn.commit().await.unwrap(); store } @@ -847,8 +874,8 @@ mod tests { // Specifically, the tests are derived from FoundationDB tests: https://github.com/ept/hermitage/blob/master/foundationdb.md // G0: Write Cycles (dirty writes) - fn g0_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn g0_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key1 = Bytes::from("k1"); let key2 = Bytes::from("k2"); let value3 = Bytes::from("v3"); @@ -870,10 +897,10 @@ mod tests { txn1.set(&key2, &value5).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); txn2.set(&key2, &value6).unwrap(); - assert!(match txn2.commit() { + assert!(match txn2.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -887,22 +914,22 @@ mod tests { { let txn3 = store.begin().unwrap(); - let val1 = txn3.get(&key1).unwrap(); + let val1 = txn3.get(&key1).unwrap().unwrap(); assert_eq!(val1, value3.as_ref()); - let val2 = txn3.get(&key2).unwrap(); + let val2 = txn3.get(&key2).unwrap().unwrap(); assert_eq!(val2, value5.as_ref()); } } - #[test] - fn g0() { - g0_tests(false); // snapshot isolation - g0_tests(true); // serializable snapshot isolation + #[tokio::test] + async fn g0() { + g0_tests(false).await; // snapshot isolation + g0_tests(true).await; // serializable snapshot isolation } // G1a: Aborted Reads (dirty reads, cascaded aborts) - fn g1a_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn g1a_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key1 = Bytes::from("k1"); let key2 = Bytes::from("k2"); let value1 = Bytes::from("v1"); @@ -918,37 +945,37 @@ mod tests { txn1.set(&key1, &value3).unwrap(); let range = "k1".as_bytes()..="k3".as_bytes(); - let res = txn2.scan(range.clone()).unwrap(); + let res = txn2.scan(range.clone(), None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); + assert_eq!(res[0].1, value1); drop(txn1); - let res = txn2.scan(range).unwrap(); + let res = txn2.scan(range, None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); + assert_eq!(res[0].1, value1); - txn2.commit().unwrap(); + txn2.commit().await.unwrap(); } { let txn3 = store.begin().unwrap(); - let val1 = txn3.get(&key1).unwrap(); + let val1 = txn3.get(&key1).unwrap().unwrap(); assert_eq!(val1, value1.as_ref()); - let val2 = txn3.get(&key2).unwrap(); + let val2 = txn3.get(&key2).unwrap().unwrap(); assert_eq!(val2, value2.as_ref()); } } - #[test] - fn g1a() { - g1a_tests(false); // snapshot isolation - g1a_tests(true); // serializable snapshot isolation + #[tokio::test] + async fn g1a() { + g1a_tests(false).await; // snapshot isolation + g1a_tests(true).await; // serializable snapshot isolation } // G1b: Intermediate Reads (dirty reads) - fn g1b_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn g1b_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key1 = Bytes::from("k1"); let key2 = Bytes::from("k2"); @@ -966,30 +993,30 @@ mod tests { txn1.set(&key1, &value3).unwrap(); let range = "k1".as_bytes()..="k3".as_bytes(); - let res = txn2.scan(range.clone()).unwrap(); + let res = txn2.scan(range.clone(), None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); + assert_eq!(res[0].1, value1); txn1.set(&key1, &value4).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); - let res = txn2.scan(range).unwrap(); + let res = txn2.scan(range, None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); + assert_eq!(res[0].1, value1); - txn2.commit().unwrap(); + txn2.commit().await.unwrap(); } } - #[test] - fn g1b() { - g1b_tests(false); // snapshot isolation - g1b_tests(true); // serializable snapshot isolation + #[tokio::test] + async fn g1b() { + g1b_tests(false).await; // snapshot isolation + g1b_tests(true).await; // serializable snapshot isolation } // G1c: Circular Information Flow (dirty reads) - fn g1c_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn g1c_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key1 = Bytes::from("k1"); let key2 = Bytes::from("k2"); @@ -1008,11 +1035,11 @@ mod tests { txn1.set(&key1, &value3).unwrap(); txn2.set(&key2, &value4).unwrap(); - assert_eq!(txn1.get(&key2).unwrap(), value2.as_ref()); - assert_eq!(txn2.get(&key1).unwrap(), value1.as_ref()); + assert_eq!(txn1.get(&key2).unwrap().unwrap(), value2.as_ref()); + assert_eq!(txn2.get(&key1).unwrap().unwrap(), value1.as_ref()); - txn1.commit().unwrap(); - assert!(match txn2.commit() { + txn1.commit().await.unwrap(); + assert!(match txn2.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -1025,15 +1052,15 @@ mod tests { } } - #[test] - fn g1c() { - g1c_tests(false); // snapshot isolation - g1c_tests(true); + #[tokio::test] + async fn g1c() { + g1c_tests(false).await; // snapshot isolation + g1c_tests(true).await; } // PMP: Predicate-Many-Preceders - fn pmp_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn pmp_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key3 = Bytes::from("k3"); let value1 = Bytes::from("v1"); @@ -1046,33 +1073,33 @@ mod tests { // k3 should not be visible to txn1 let range = "k1".as_bytes()..="k3".as_bytes(); - let res = txn1.scan(range.clone()).unwrap(); + let res = txn1.scan(range.clone(), None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); - assert_eq!(res[1].0, value2); + assert_eq!(res[0].1, value1); + assert_eq!(res[1].1, value2); // k3 is committed by txn2 txn2.set(&key3, &value3).unwrap(); - txn2.commit().unwrap(); + txn2.commit().await.unwrap(); // k3 should still not be visible to txn1 let range = "k1".as_bytes()..="k3".as_bytes(); - let res = txn1.scan(range.clone()).unwrap(); + let res = txn1.scan(range.clone(), None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); - assert_eq!(res[1].0, value2); + assert_eq!(res[0].1, value1); + assert_eq!(res[1].1, value2); } } - #[test] - fn pmp() { - pmp_tests(false); - pmp_tests(true); + #[tokio::test] + async fn pmp() { + pmp_tests(false).await; + pmp_tests(true).await; } // PMP-Write: Circular Information Flow (dirty reads) - fn pmp_write_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn pmp_write_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key1 = Bytes::from("k1"); let key2 = Bytes::from("k2"); @@ -1088,20 +1115,20 @@ mod tests { txn1.set(&key1, &value3).unwrap(); let range = "k1".as_bytes()..="k2".as_bytes(); - let res = txn2.scan(range.clone()).unwrap(); + let res = txn2.scan(range.clone(), None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); - assert_eq!(res[1].0, value2); + assert_eq!(res[0].1, value1); + assert_eq!(res[1].1, value2); txn2.delete(&key2).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); let range = "k1".as_bytes()..="k3".as_bytes(); - let res = txn2.scan(range.clone()).unwrap(); + let res = txn2.scan(range.clone(), None).unwrap(); assert_eq!(res.len(), 1); - assert_eq!(res[0].0, value1); + assert_eq!(res[0].1, value1); - assert!(match txn2.commit() { + assert!(match txn2.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -1114,15 +1141,15 @@ mod tests { } } - #[test] - fn pmp_write() { - pmp_write_tests(false); - pmp_write_tests(true); + #[tokio::test] + async fn pmp_write() { + pmp_write_tests(false).await; + pmp_write_tests(true).await; } // P4: Lost Update - fn p4_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn p4_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key1 = Bytes::from("k1"); let value3 = Bytes::from("v3"); @@ -1137,9 +1164,9 @@ mod tests { txn1.set(&key1, &value3).unwrap(); txn2.set(&key1, &value3).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); - assert!(match txn2.commit() { + assert!(match txn2.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -1152,15 +1179,15 @@ mod tests { } } - #[test] - fn p4() { - p4_tests(false); - p4_tests(true); + #[tokio::test] + async fn p4() { + p4_tests(false).await; + p4_tests(true).await; } // G-single: Single Anti-dependency Cycles (read skew) - fn g_single_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn g_single_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key1 = Bytes::from("k1"); let key2 = Bytes::from("k2"); @@ -1173,28 +1200,28 @@ mod tests { let mut txn1 = store.begin().unwrap(); let mut txn2 = store.begin().unwrap(); - assert_eq!(txn1.get(&key1).unwrap(), value1.as_ref()); - assert_eq!(txn2.get(&key1).unwrap(), value1.as_ref()); - assert_eq!(txn2.get(&key2).unwrap(), value2.as_ref()); + assert_eq!(txn1.get(&key1).unwrap().unwrap(), value1.as_ref()); + assert_eq!(txn2.get(&key1).unwrap().unwrap(), value1.as_ref()); + assert_eq!(txn2.get(&key2).unwrap().unwrap(), value2.as_ref()); txn2.set(&key1, &value3).unwrap(); txn2.set(&key2, &value4).unwrap(); - txn2.commit().unwrap(); + txn2.commit().await.unwrap(); - assert_eq!(txn1.get(&key2).unwrap(), value2.as_ref()); - txn1.commit().unwrap(); + assert_eq!(txn1.get(&key2).unwrap().unwrap(), value2.as_ref()); + txn1.commit().await.unwrap(); } } - #[test] - fn g_single() { - g_single_tests(false); - g_single_tests(true); + #[tokio::test] + async fn g_single() { + g_single_tests(false).await; + g_single_tests(true).await; } // G-single-write-1: Single Anti-dependency Cycles (read skew) - fn g_single_write_1_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn g_single_write_1_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key1 = Bytes::from("k1"); let key2 = Bytes::from("k2"); @@ -1207,22 +1234,22 @@ mod tests { let mut txn1 = store.begin().unwrap(); let mut txn2 = store.begin().unwrap(); - assert_eq!(txn1.get(&key1).unwrap(), value1.as_ref()); + assert_eq!(txn1.get(&key1).unwrap().unwrap(), value1.as_ref()); let range = "k1".as_bytes()..="k2".as_bytes(); - let res = txn2.scan(range.clone()).unwrap(); + let res = txn2.scan(range.clone(), None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); - assert_eq!(res[1].0, value2); + assert_eq!(res[0].1, value1); + assert_eq!(res[1].1, value2); txn2.set(&key1, &value3).unwrap(); txn2.set(&key2, &value4).unwrap(); - txn2.commit().unwrap(); + txn2.commit().await.unwrap(); txn1.delete(&key2).unwrap(); - assert!(txn1.get(&key2).is_err()); - assert!(match txn1.commit() { + assert!(txn1.get(&key2).unwrap().is_none()); + assert!(match txn1.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -1235,15 +1262,15 @@ mod tests { } } - #[test] - fn g_single_write_1() { - g_single_write_1_tests(false); - g_single_write_1_tests(true); + #[tokio::test] + async fn g_single_write_1() { + g_single_write_1_tests(false).await; + g_single_write_1_tests(true).await; } // G-single-write-2: Single Anti-dependency Cycles (read skew) - fn g_single_write_2_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn g_single_write_2_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key1 = Bytes::from("k1"); let key2 = Bytes::from("k2"); @@ -1256,12 +1283,12 @@ mod tests { let mut txn1 = store.begin().unwrap(); let mut txn2 = store.begin().unwrap(); - assert_eq!(txn1.get(&key1).unwrap(), value1.as_ref()); + assert_eq!(txn1.get(&key1).unwrap().unwrap(), value1.as_ref()); let range = "k1".as_bytes()..="k2".as_bytes(); - let res = txn2.scan(range.clone()).unwrap(); + let res = txn2.scan(range.clone(), None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); - assert_eq!(res[1].0, value2); + assert_eq!(res[0].1, value1); + assert_eq!(res[1].1, value2); txn2.set(&key1, &value3).unwrap(); @@ -1271,18 +1298,18 @@ mod tests { drop(txn1); - txn2.commit().unwrap(); + txn2.commit().await.unwrap(); } } - #[test] - fn g_single_write_2() { - g_single_write_2_tests(false); - g_single_write_2_tests(true); + #[tokio::test] + async fn g_single_write_2() { + g_single_write_2_tests(false).await; + g_single_write_2_tests(true).await; } - fn g2_item_tests(is_ssi: bool) { - let store = create_hermitage_store(is_ssi); + async fn g2_item_tests(is_ssi: bool) { + let store = create_hermitage_store(is_ssi).await; let key1 = Bytes::from("k1"); let key2 = Bytes::from("k2"); @@ -1296,22 +1323,22 @@ mod tests { let mut txn2 = store.begin().unwrap(); let range = "k1".as_bytes()..="k2".as_bytes(); - let res = txn1.scan(range.clone()).unwrap(); + let res = txn1.scan(range.clone(), None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); - assert_eq!(res[1].0, value2); + assert_eq!(res[0].1, value1); + assert_eq!(res[1].1, value2); - let res = txn2.scan(range.clone()).unwrap(); + let res = txn2.scan(range.clone(), None).unwrap(); assert_eq!(res.len(), 2); - assert_eq!(res[0].0, value1); - assert_eq!(res[1].0, value2); + assert_eq!(res[0].1, value1); + assert_eq!(res[1].1, value2); txn1.set(&key1, &value3).unwrap(); txn2.set(&key2, &value4).unwrap(); - txn1.commit().unwrap(); + txn1.commit().await.unwrap(); - assert!(match txn2.commit() { + assert!(match txn2.commit().await { Err(err) => { if let Error::TransactionReadConflict = err { true @@ -1324,17 +1351,17 @@ mod tests { } } - #[test] - fn g2_item() { - g2_item_tests(false); - g2_item_tests(true); + #[tokio::test] + async fn g2_item() { + g2_item_tests(false).await; + g2_item_tests(true).await; } fn require_send(_: T) {} fn require_sync(_: T) {} - #[test] - fn is_send_sync() { + #[tokio::test] + async fn is_send_sync() { let (db, _) = create_store(false); let txn = db.begin().unwrap(); @@ -1344,8 +1371,8 @@ mod tests { require_sync(txn); } - #[test] - fn max_transaction_entries_limit_exceeded() { + #[tokio::test] + async fn max_transaction_entries_limit_exceeded() { let temp_dir = create_temp_directory(); let mut opts = Options::new(); opts.dir = temp_dir.path().to_path_buf(); @@ -1383,7 +1410,7 @@ mod tests { } } - const ENTRIES: usize = 4_000_00; + const ENTRIES: usize = 400_000; const KEY_SIZE: usize = 24; const VALUE_SIZE: usize = 150; const RNG_SEED: u64 = 3; @@ -1428,9 +1455,9 @@ mod tests { fastrand::Rng::with_seed(RNG_SEED) } - #[test] + #[tokio::test] #[ignore] - fn insert_large_txn_and_get() { + async fn insert_large_txn_and_get() { let temp_dir = create_temp_directory(); let mut opts = Options::new(); opts.dir = temp_dir.path().to_path_buf(); @@ -1444,15 +1471,26 @@ mod tests { let (key, value) = gen_pair(&mut rng); txn.set(&key, &value).unwrap(); } - txn.commit().unwrap(); + txn.commit().await.unwrap(); drop(txn); // Read the keys from the store let mut rng = make_rng(); let txn = store.begin_with_mode(Mode::ReadOnly).unwrap(); - for i in 0..ENTRIES { + for _i in 0..ENTRIES { let (key, _) = gen_pair(&mut rng); txn.get(&key).unwrap(); } } + + #[tokio::test] + async fn empty_scan_should_not_return_an_error() { + let (store, _) = create_store(false); + + let range = "key1".as_bytes()..="key3".as_bytes(); + + let txn = store.begin().unwrap(); + let results = txn.scan(range, None).unwrap(); + assert_eq!(results.len(), 0); + } } diff --git a/src/storage/kv/util.rs b/src/storage/kv/util.rs index 485b94cd..452ee326 100644 --- a/src/storage/kv/util.rs +++ b/src/storage/kv/util.rs @@ -1,5 +1,7 @@ +use bytes::Bytes; use chrono::Utc; use crc32fast::Hasher as crc32Hasher; +use sha2::{Digest, Sha256}; /// Calculates the CRC32 hash of a byte array. /// It creates a new CRC32 hasher, updates it with the byte array, and finalizes the hash. @@ -26,3 +28,10 @@ pub(crate) fn now() -> u64 { assert!(timestamp > 0); timestamp as u64 } + +pub(crate) fn sha256(arg: Bytes) -> Bytes { + let mut hasher = Sha256::new(); + hasher.update(arg); + let result = hasher.finalize(); + Bytes::copy_from_slice(result.as_slice()) +} diff --git a/src/storage/log/mod.rs b/src/storage/log/mod.rs index 9c32f4f0..f888000c 100644 --- a/src/storage/log/mod.rs +++ b/src/storage/log/mod.rs @@ -1887,7 +1887,7 @@ mod tests { // Test reading from segment after appending let mut bs = vec![0; 4]; - let n = segment.read_at(&mut bs, 11 as u64).expect("should read"); + let n = segment.read_at(&mut bs, 11_u64).expect("should read"); assert_eq!(4, n); assert_eq!(&[11, 12, 13, 14].to_vec(), &bs[..]); @@ -2226,10 +2226,10 @@ mod tests { assert_eq!(&[0, 1, 2, 3].to_vec(), &bs[WAL_RECORD_HEADER_SIZE..]); // Read remaining empty block - const remaining: usize = BLOCK_SIZE - 11; - let mut bs = [0u8; remaining]; + const REMAINING: usize = BLOCK_SIZE - 11; + let mut bs = [0u8; REMAINING]; let bytes_read = buf_reader.read(&mut bs).expect("should read"); - assert_eq!(bytes_read, remaining); + assert_eq!(bytes_read, REMAINING); // Read second record from the MultiSegmentReader let mut bs = [0u8; 11]; @@ -2238,9 +2238,9 @@ mod tests { assert_eq!(&[4, 5, 6, 7].to_vec(), &bs[WAL_RECORD_HEADER_SIZE..]); // Read remaining empty block - let mut bs = [0u8; remaining]; + let mut bs = [0u8; REMAINING]; let bytes_read = buf_reader.read(&mut bs).expect("should read"); - assert_eq!(bytes_read, remaining); + assert_eq!(bytes_read, REMAINING); let mut bs = [0u8; 11]; buf_reader.read(&mut bs).expect_err("should not read"); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 179d41df..fc8784aa 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,4 +1,4 @@ pub mod cache; -pub mod index; -pub mod kv; -pub mod log; +pub(crate) mod index; +pub(crate) mod kv; +pub(crate) mod log;