From 33d7e79cf7d399f21471ba87eba56fcd7548bf57 Mon Sep 17 00:00:00 2001 From: Flix Date: Fri, 27 May 2022 08:01:44 +0200 Subject: [PATCH] feat: Initial version --- CHANGELOG.md | 11 + Cargo.lock | 632 ++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 36 ++- LICENSE.md | 21 ++ README.md | 63 +++- examples/common/mod.rs | 22 ++ examples/simple.rs | 62 ++++ src/error.rs | 36 +++ src/job.rs | 125 ++++++++ src/lib.rs | 18 +- src/queue.rs | 101 +++++-- src/registry.rs | 115 ++++++++ src/runner.rs | 235 +++++++++++++++ src/spawn.rs | 179 ++++++++++++ src/users.rs | 31 ++ src/utils.rs | 35 +++ tests/common/mod.rs | 21 ++ tests/general.rs | 74 +++++ tests/ordering.rs | 58 ++++ tests/stress.rs | 60 ++++ 20 files changed, 1894 insertions(+), 41 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 LICENSE.md create mode 100644 examples/common/mod.rs create mode 100644 examples/simple.rs create mode 100644 src/error.rs create mode 100644 src/job.rs create mode 100644 src/registry.rs create mode 100644 src/runner.rs create mode 100644 src/spawn.rs create mode 100644 src/users.rs create mode 100644 src/utils.rs create mode 100644 tests/common/mod.rs create mode 100644 tests/general.rs create mode 100644 tests/ordering.rs create mode 100644 tests/stress.rs diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..b396d86 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,11 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [0.1.0] - 2022-05-29 + +### Features + +- Initial version + + diff --git a/Cargo.lock b/Cargo.lock index 9215761..60e45da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,6 +74,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "async-lock" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" +dependencies = [ + "event-listener", +] + [[package]] name = "async-trait" version = "0.1.53" @@ -129,6 +138,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" + [[package]] name = "bincode" version = "1.3.3" @@ -168,10 +183,39 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf40b65182c9d0f66b6584d77fc1476a8b00de8b63dfaf3c24faf813e9938bea" dependencies = [ + "bonsaidb-client", "bonsaidb-core", "bonsaidb-local", ] +[[package]] +name = "bonsaidb-client" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7940e083812e281c180c4d577a62848cecf1562f95b2b53c086ce5a62cb9361" +dependencies = [ + "async-lock", + "async-trait", + "bonsaidb-core", + "bonsaidb-utils", + "derive-where", + "fabruic", + "flume", + "futures", + "js-sys", + "log", + "once_cell", + "parking_lot 0.12.0", + "pot", + "serde", + "thiserror", + "tokio", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "bonsaidb-core" version = "0.4.1" @@ -216,7 +260,7 @@ dependencies = [ "log", "nebari", "p256", - "parking_lot", + "parking_lot 0.12.0", "pot", "rand", "serde", @@ -255,11 +299,14 @@ version = "0.1.0" dependencies = [ "bonsaidb", "color-eyre", + "getrandom", + "ntest", "serde", "serde_json", "thiserror", "time", "tokio", + "tokio-retry", "tracing", "tracing-futures", "tracing-subscriber", @@ -295,6 +342,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "circulate" version = "0.3.0" @@ -304,7 +361,7 @@ dependencies = [ "arc-bytes", "flume", "futures", - "parking_lot", + "parking_lot 0.12.0", "pot", "serde", ] @@ -342,6 +399,16 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b" +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.3" @@ -404,6 +471,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "ct-logs" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "362b2bf41dd138704dd7e4f253ecc5026287b0626a29fecbcbcd26cec8130e7d" +dependencies = [ + "sct", +] + [[package]] name = "darling" version = "0.13.4" @@ -439,6 +515,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" + [[package]] name = "der" version = "0.4.5" @@ -448,6 +530,30 @@ dependencies = [ "const-oid", ] +[[package]] +name = "der-oid-macro" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c73af209b6a5dc8ca7cbaba720732304792cddc933cfea3d74509c2b1ef2f436" +dependencies = [ + "num-bigint", + "num-traits", + "syn", +] + +[[package]] +name = "der-parser" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cddf120f700b411b2b02ebeb7f04dc0b7c8835909a6c2f52bf72ed0dd3433b2" +dependencies = [ + "der-oid-macro", + "nom", + "num-bigint", + "num-traits", + "rusticata-macros", +] + [[package]] name = "derive-where" version = "1.0.0-rc.2" @@ -534,6 +640,39 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fabruic" +version = "0.0.1-dev.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29775a890f1feb8ce6fb7b5a5ddddf17d14e83faa6106e9b0cae36ab8e902740" +dependencies = [ + "async-trait", + "bincode", + "bytes", + "ct-logs", + "flume", + "futures-channel", + "futures-executor", + "futures-util", + "if_chain", + "parking_lot 0.11.2", + "pin-project", + "quinn", + "rcgen", + "ring", + "rustls", + "rustls-native-certs", + "serde", + "thiserror", + "time", + "tokio", + "url", + "webpki", + "webpki-roots", + "x509-parser", + "zeroize", +] + [[package]] name = "ff" version = "0.10.1" @@ -554,7 +693,7 @@ dependencies = [ "futures-sink", "nanorand", "pin-project", - "spin", + "spin 0.9.3", ] [[package]] @@ -563,6 +702,16 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +dependencies = [ + "matches", + "percent-encoding", +] + [[package]] name = "futures" version = "0.3.21" @@ -652,6 +801,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.5" @@ -738,12 +896,38 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "idna" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "if_chain" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb56e1aa765b4b4f3aadfab769793b7087bb03a4ea4920644a6d238e2df5b9ed" + [[package]] name = "indenter" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "itertools" version = "0.10.3" @@ -817,12 +1001,24 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matches" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" + [[package]] name = "memchr" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.5.1" @@ -867,11 +1063,21 @@ dependencies = [ "lru", "num_cpus", "once_cell", - "parking_lot", + "parking_lot 0.12.0", "thiserror", "tracing", ] +[[package]] +name = "nom" +version = "7.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "ntapi" version = "0.3.7" @@ -881,6 +1087,68 @@ dependencies = [ "winapi", ] +[[package]] +name = "ntest" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c544e496c816f0a59645c0bb69097e453df203954ae2ed4b3ac4251fad69d44" +dependencies = [ + "ntest_proc_macro_helper", + "ntest_test_cases", + "ntest_timeout", +] + +[[package]] +name = "ntest_proc_macro_helper" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f52e34b414605b77efc95c3f0ecef01df0c324bcc7f68d9a9cb7a7552777e52" + +[[package]] +name = "ntest_test_cases" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99a81eb400abc87063f829560bc5c5c835177703b83d1cd991960db0b2a00abe" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "ntest_timeout" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b10db009e117aca57cbfb70ac332348f9a89d09ff7204497c283c0f7a0c96323" +dependencies = [ + "ntest_proc_macro_helper", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "num-bigint" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +dependencies = [ + "autocfg", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -918,6 +1186,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "oid-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe554cb2393bc784fd678c82c84cc0599c31ceadc7f03a594911f822cb8d1815" +dependencies = [ + "der-parser", +] + [[package]] name = "once_cell" version = "1.12.0" @@ -930,6 +1207,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "ordered-varint" version = "1.0.1" @@ -953,6 +1236,17 @@ dependencies = [ "sha2 0.9.9", ] +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.5", +] + [[package]] name = "parking_lot" version = "0.12.0" @@ -960,7 +1254,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.3", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", ] [[package]] @@ -976,6 +1284,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + [[package]] name = "pin-project" version = "1.0.10" @@ -1082,6 +1396,59 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quinn" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7542006acd6e057ff632307d219954c44048f818898da03113d6c0086bfddd9" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "fxhash", + "quinn-proto", + "quinn-udp", + "rustls", + "thiserror", + "tokio", + "tracing", + "webpki", +] + +[[package]] +name = "quinn-proto" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a13a5c0a674c1ce7150c9df7bc4a1e46c2fbbe7c710f56c0dc78b1a810e779e" +dependencies = [ + "bytes", + "fxhash", + "rand", + "ring", + "rustls", + "rustls-native-certs", + "rustls-pemfile 0.2.1", + "slab", + "thiserror", + "tinyvec", + "tracing", + "webpki", +] + +[[package]] +name = "quinn-udp" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3149f7237331015f1a6adf065c397d1be71e032fcf110ba41da52e7926b882f" +dependencies = [ + "futures-util", + "libc", + "quinn-proto", + "socket2", + "tokio", + "tracing", +] + [[package]] name = "quote" version = "1.0.18" @@ -1121,6 +1488,17 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rcgen" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5911d1403f4143c9d56a702069d593e8d0f3fab880a85e103604d0893ea31ba7" +dependencies = [ + "chrono", + "ring", + "yasna", +] + [[package]] name = "redox_syscall" version = "0.2.13" @@ -1154,24 +1532,132 @@ version = "0.6.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin 0.5.2", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rustc-demangle" version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" +[[package]] +name = "rusticata-macros" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" +dependencies = [ + "nom", +] + +[[package]] +name = "rustls" +version = "0.20.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" +dependencies = [ + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.0", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9" +dependencies = [ + "base64", +] + [[package]] name = "ryu" version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" +[[package]] +name = "schannel" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" +dependencies = [ + "lazy_static", + "windows-sys", +] + [[package]] name = "scopeguard" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "security-framework" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.137" @@ -1277,6 +1763,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "spin" version = "0.9.3" @@ -1392,6 +1884,21 @@ dependencies = [ "num_threads", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" + [[package]] name = "tokio" version = "1.18.2" @@ -1404,7 +1911,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", - "parking_lot", + "parking_lot 0.12.0", "pin-project-lite", "signal-hook-registry", "socket2", @@ -1423,6 +1930,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand", + "tokio", +] + [[package]] name = "toml" version = "0.5.9" @@ -1563,18 +2081,51 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" +[[package]] +name = "unicode-bidi" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" + [[package]] name = "unicode-ident" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d22af068fba1eb5edcb4aea19d382b2a3deb4c8f9d475c589b6ada9e0fd493ee" +[[package]] +name = "unicode-normalization" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-xid" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + +[[package]] +name = "url" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" +dependencies = [ + "form_urlencoded", + "idna", + "matches", + "percent-encoding", +] + [[package]] name = "valuable" version = "0.1.0" @@ -1624,6 +2175,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f741de44b75e14c35df886aff5f1eb73aa114fa5d4d00dcd37b5e01259bf3b2" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.80" @@ -1661,10 +2224,39 @@ checksum = "3ff2d9b2cd6287507cf29e416658eb4a5b6dcf402774b4276b5682cc95fc78f2" dependencies = [ "event-listener", "futures-util", - "parking_lot", + "parking_lot 0.12.0", "thiserror", ] +[[package]] +name = "web-sys" +version = "0.3.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b17e741662c70c8bd24ac5c5b18de314a2c26c32bf8346ee1e6f53de919c283" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d8de8415c823c8abd270ad483c6feeac771fad964890779f9a8cb24fbbc1bf" +dependencies = [ + "webpki", +] + [[package]] name = "winapi" version = "0.3.9" @@ -1739,6 +2331,32 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "x509-parser" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc90836a84cb72e6934137b1504d0cae304ef5d83904beb0c8d773bbfe256ed" +dependencies = [ + "base64", + "chrono", + "data-encoding", + "der-parser", + "lazy_static", + "nom", + "oid-registry", + "rusticata-macros", + "thiserror", +] + +[[package]] +name = "yasna" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e262a29d0e61ccf2b6190d7050d4b237535fc76ce4c1210d9caa316f71dffa75" +dependencies = [ + "chrono", +] + [[package]] name = "zeroize" version = "1.4.3" diff --git a/Cargo.toml b/Cargo.toml index cc88bb5..663ad62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,22 +1,46 @@ [package] +authors = ["Flix "] +categories = ["asynchronous", "concurrency", "database"] +description = "Message/job queue based on bonsaidb, similar to sqlxmq." +documentation = "https://docs.rs/bonsaimq" +edition = "2021" +homepage = "https://github.com/FlixCoder/bonsaimq" +keywords = ["message", "job", "queue", "database", "persistent"] +license = "MIT" name = "bonsaimq" +readme = "README.md" +repository = "https://github.com/FlixCoder/bonsaimq" +resolver = "2" version = "0.1.0" -edition = "2021" [dependencies] -bonsaidb = { version = "0.4.1", features = ["local", "local-async", "local-instrument"] } -serde = { version = "1.0.137", features = ["derive"] } +bonsaidb = "0.4.1" +getrandom = {version = "0.2.6", features = ["std"]} +serde = {version = "1.0.137", features = ["derive"]} serde_json = "1.0.81" thiserror = "1.0.31" time = "0.3.9" +tokio = {version = "1.18.0", features = ["rt", "time"]} +tokio-retry = "0.3.0" tracing = "0.1.34" tracing-futures = "0.2.5" [dev-dependencies] +bonsaidb = {version = "0.4.1", features = ["local", "local-async", "local-instrument", "client"]} color-eyre = "0.6.1" -tokio = { version = "1.18.0", features = ["full"] } -tracing-subscriber = { version = "0.3.11", features = ["env-filter"] } +ntest = "0.7.5" +tokio = {version = "1.18.0", features = ["full"]} +tracing-subscriber = {version = "0.3.11", features = ["env-filter"]} [profile.release] -lto = true debug = true +lto = true + +# Also test the examples +[[example]] +name = "simple" +path = "examples/simple.rs" +test = true + +[package.metadata.cargo-udeps.ignore] +normal = ["tracing-futures"] diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..d0035d4 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 FlixCoder + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index 774b1c5..89d128a 100644 --- a/README.md +++ b/README.md @@ -2,17 +2,76 @@ Simple database message queue based on [bonsaidb](https://github.com/khonsulabs/bonsaidb). +The project is highly influenced by [sqlxmq](https://github.com/Diggsey/sqlxmq). + +Warning: This project is in early alpha and should not be used in production! + ## Usage Import the project using: ```toml +# adjust the version to the latest version: +bonsaimq = "0.1.0" +# or bonsaimq = { git = "https://github.com/FlixCoder/bonsaimq" } ``` -## Examples +Then you can use the message/job queue as follows: + +- You need job handlers, which are async functions that receive one argument of type `CurrentJob` and return nothing. `CurrentJob` allows interfacing the job to retrieve job input or complete the job etc. +- The macro `job_regristy!` needs to be use to create a job registry, which maps message names/types to the job handlers and allows spawning new jobs. +- A job runner needs to be created and run on a bonsai database. It runs in the background as long as the handle is in scope and executes the jobs according to the incoming messages. It acts on the job registry. + +## Example + +Besides the following simple example, see the examples in the [examples folder](https://github.com/FlixCoder/bonsaimq/tree/main/examples/) and take a look at the tests. + +```rust +use bonsaidb::local::{ + config::{Builder, StorageConfiguration}, + AsyncDatabase, +}; +use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema}; +use color_eyre::Result; + +/// Example job function. It receives a handle to the current job, which gives +/// the ability to get the input payload, complete the job and more. +async fn greet(mut job: CurrentJob) { + // Load the JSON payload and make sure it is there. + let name: String = job.payload_json().expect("input should be given").expect("deserializing"); + println!("Hello {name}!"); + job.complete().await.expect("access to DB"); +} -Besides the following simple example, there are more examples in the [examples folder](./examples/) +// The JobRegistry provides a way to spawn new jobs and provides the interface +// for the JobRunner to find the functions to execute for the jobs. +job_registry!(JobRegistry, { + Greetings: "greet" => greet, +}); + +#[tokio::main] +async fn main() -> Result<()> { + // Open a local database for this example. + let db_path = "simple-doc-example.bonsaidb"; + let db = AsyncDatabase::open::(StorageConfiguration::new(db_path)).await?; + + // Start the job runner to execute jobs from the messages in the queue in the + // database. + let job_runner = JobRunner::new(db.clone()).run::(); + + // Spawn new jobs via a message on the database queue. + let job_id = JobRegistry::Greetings.builder().payload_json("cats")?.spawn(&db).await?; + + // Wait for job to finish execution, polling every 100 ms. + bonsaimq::await_job(job_id, 100, &db).await?; + + // Clean up. + job_runner.abort(); // Is done automatically on drop. + tokio::fs::remove_dir_all(db_path).await?; + Ok(()) +} +``` ## Lints diff --git a/examples/common/mod.rs b/examples/common/mod.rs new file mode 100644 index 0000000..cf8929d --- /dev/null +++ b/examples/common/mod.rs @@ -0,0 +1,22 @@ +//! Common example functions + +use std::sync::Once; + +use tracing::level_filters::LevelFilter; +use tracing_subscriber::EnvFilter; + +/// Initialize test environment. +pub fn init() { + /// Do init only once. + static SETUP: Once = Once::new(); + SETUP.call_once(|| { + color_eyre::install().expect("color-eyre"); + + let level_filter = LevelFilter::TRACE; + let filter = EnvFilter::from_default_env() + .add_directive(level_filter.into()) + .add_directive("want=warn".parse().expect("parse env filter")) + .add_directive("mio=warn".parse().expect("parse env filter")); + tracing_subscriber::fmt().with_test_writer().with_env_filter(filter).init(); + }); +} diff --git a/examples/simple.rs b/examples/simple.rs new file mode 100644 index 0000000..aeb2056 --- /dev/null +++ b/examples/simple.rs @@ -0,0 +1,62 @@ +//! Simple example. +#![allow(clippy::expect_used, unused_qualifications, clippy::unused_async, clippy::print_stdout)] + +mod common; + +use bonsaidb::local::{ + config::{Builder, StorageConfiguration}, + AsyncDatabase, +}; +use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema}; +use color_eyre::Result; + +/// Example job function. It receives a handle to the current job, which gives +/// the ability to get the input payload, complete the job and more. +async fn greet(mut job: CurrentJob) { + // Load the JSON payload and make sure it is there. + let name: String = job.payload_json().expect("input should be given").expect("deserializing"); + println!("Hello {name}!"); + job.complete().await.expect("access to DB"); +} + +/// Example job function 2 +async fn greet_german(_job: CurrentJob) { + panic!("This one fails, but doesn't kill anything! :|"); +} + +// The JobRegistry provides a way to spawn new jobs and provides the interface +// for the JobRunner to find the functions to execute for the jobs. +job_registry!(JobRegistry, { + cats: "cats" => greet, + Foxes: "foxes" => self::greet_german, +}); + +#[tokio::main] +async fn main() -> Result<()> { + common::init(); + + // Open a local database for this example. + let db_path = "simple-example.bonsaidb"; + let db = AsyncDatabase::open::(StorageConfiguration::new(db_path)).await?; + + // Start the job runner to execute jobs from the messages in the queue in the + // database. + let job_runner = JobRunner::new(db.clone()).run::(); + + // Spawn new jobs via a message on the database queue. + let job_id = JobRegistry::cats.builder().payload_json("cats")?.spawn(&db).await?; + JobRegistry::Foxes.builder().spawn(&db).await?; + + // Wait for job to finish execution. + bonsaimq::await_job(job_id, 100, &db).await?; + + job_runner.abort(); // Is done automatically on drop. + tokio::fs::remove_dir_all(db_path).await?; + Ok(()) +} + +#[test] +#[ntest::timeout(10000)] +fn example_simple() { + main().expect("running main"); +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..f4dbabb --- /dev/null +++ b/src/error.rs @@ -0,0 +1,36 @@ +//! Crate errors. + +use std::num::TryFromIntError; + +use bonsaidb::core::{pubsub, Error as BonsaiError}; +use thiserror::Error; + +/// This crate's main error type. +#[derive(Debug, Error)] +pub enum Error { + /// BonsaiDB error. + #[error("Error interacting with BonsaiDB: {0}")] + BonsaiDb(#[from] BonsaiError), + /// PubSub disconnected error. + #[error("PubSub was disconnected: {0}")] + PubSubDisconnected(#[from] pubsub::Disconnected), + /// Getrandom error. + #[error("Getrandom error: {0}")] + Getrandom(#[from] getrandom::Error), + /// Integer conversion error. + #[error("Integer conversion error: {0}")] + TryFromInt(#[from] TryFromIntError), +} + +impl Error { + /// Whether the action that lead to this error should likely be retried. + #[must_use] + pub fn should_retry(&self) -> bool { + #[allow(clippy::match_like_matches_macro)] + match self { + Self::BonsaiDb(BonsaiError::DocumentConflict(_, _)) => true, + Self::BonsaiDb(BonsaiError::Networking(_)) => true, + _ => false, + } + } +} diff --git a/src/job.rs b/src/job.rs new file mode 100644 index 0000000..4a126c0 --- /dev/null +++ b/src/job.rs @@ -0,0 +1,125 @@ +//! Provider for job handlers. + +use std::sync::Arc; + +use serde::de::DeserializeOwned; +use tokio::task::JoinHandle; +use tokio_retry::{strategy::FixedInterval, RetryIf}; +use tracing_futures::Instrument; + +use crate::{queue::Id, runner::JobRunnerHandle, AbortOnDropHandle, Error, JobFunctionType}; + +/// Handle to the `JobRunner`. +type JobRunnerHandler = Arc; + +/// Handle for the job handlers. Allows retrieving input data, setting +/// checkpoints and completing the job. The job is kept alive as long as this +/// object lives. +#[derive(Debug)] +pub struct CurrentJob { + /// ID of this job (message ID). + pub(crate) id: Id, + /// Name of job (the message name/type). + pub(crate) name: &'static str, + /// Database handle. + pub(crate) db: JobRunnerHandler, + /// The job's input JSON payload. + pub(crate) payload_json: Option, + /// The job's input bytes payload. + pub(crate) payload_bytes: Option>, + /// Keep alive job Joinhandle. + pub(crate) keep_alive: Option>>, +} + +impl CurrentJob { + /// Get the job's ID. + #[must_use] + pub fn id(&self) -> Id { + self.id + } + + /// Get the job's name. + #[must_use] + pub fn name(&self) -> &'static str { + self.name + } + + /// Get the job's JSON input (payload). + #[must_use] + pub fn payload_json(&self) -> Option> { + self.payload_json.as_ref().map(|payload| serde_json::from_value(payload.clone())) + } + + /// Get the job's byte input (payload). + #[must_use] + pub fn payload_bytes(&self) -> Option<&Vec> { + self.payload_bytes.as_ref() + } + + /// Keep alive this job by pushing forward the `attempt_at` field in the + /// database. + pub(crate) async fn keep_alive( + db: JobRunnerHandler, + id: Id, + ) -> Result>, Error> { + db.keep_alive(id).await?; + + let span = tracing::debug_span!("job-keep-alive"); + Ok(tokio::task::spawn( + async move { + loop { + let duration = RetryIf::spawn( + FixedInterval::from_millis(10).take(2), + || db.keep_alive(id), + Error::should_retry, + ) + .await?; + tokio::time::sleep(duration.div_f32(2.0)).await; + } + } + .instrument(span), + )) + } + + /// Job running function that handles retries as well etc. + pub(crate) fn run(self, mut function: JobFunctionType) -> JoinHandle> { + let span = tracing::debug_span!("job-run"); + tokio::task::spawn( + async move { + let id = self.id; + let db = self.db.clone(); + + tracing::trace!("Starting job with ID {id}."); + function(self).await; + + tracing::trace!("Updating job with ID {id} after execution."); + RetryIf::spawn( + FixedInterval::from_millis(10).take(2), + || db.job_update(id), + Error::should_retry, + ) + .await?; + tracing::trace!("Job with ID {id} finished execution."); + Ok(()) + } + .instrument(span), + ) + } + + /// Complete the job. Mark it as completed. Without doing this, it will + /// be retried! + pub async fn complete(&mut self) -> Result<(), Error> { + RetryIf::spawn( + FixedInterval::from_millis(10).take(2), + || self.db.complete(self.id), + Error::should_retry, + ) + .await?; + if let Some(keep_alive) = self.keep_alive.take() { + keep_alive.abort(); + }; + Ok(()) + } + + // TODO: Checkpoint capability. +} diff --git a/src/lib.rs b/src/lib.rs index b6ffbbd..0cbccdf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,19 @@ -//! TODO: Crate doc +#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))] +mod error; +mod job; mod queue; +mod registry; +mod runner; +mod spawn; +mod users; +mod utils; + +pub use error::Error; +pub use job::CurrentJob; +pub use queue::{MessageQueueSchema, RetryTiming}; +pub use registry::{JobFunctionType, JobRegister}; +pub use runner::JobRunner; +pub use spawn::JobBuilder; +pub use users::*; +pub use utils::AbortOnDropHandle; diff --git a/src/queue.rs b/src/queue.rs index 5559284..3132c5e 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,4 +1,4 @@ -//! Database message queue implementation +//! Database storage for the message queue. use std::time::Duration; @@ -16,70 +16,91 @@ pub type Id = u128; /// Timestamp datatype (UNIX timestamp nanos). pub type Timestamp = i128; +/// PubSub channel for notifying of new messages to check. +pub(crate) const MQ_NOTIFY: &str = "message_queue_notify"; + /// Database schema for the message queue. #[derive(Debug, Schema)] #[schema(name = "message_queue", collections = [Message, MessagePayload])] pub struct MessageQueueSchema; /// The message queue's message metadata. -#[derive(Debug, Serialize, Deserialize, Collection)] -#[collection(name = "messages", primary_key = Id, views = [DueMessages, LatestMessage])] +#[derive(Debug, Clone, Serialize, Deserialize, Collection)] +#[collection( + name = "messages", + primary_key = Id, + natural_id = |msg: &Message| Some(msg.id), + views = [DueMessages, LatestMessage] +)] pub struct Message { + /// The message ID. + pub id: Id, /// Name of the message, i.e. a text message type identifier. - name: String, + pub name: String, /// Commit timestamp. - created_at: Timestamp, + pub created_at: Timestamp, /// Next execution timestamp. - attempt_at: Timestamp, + pub attempt_at: Timestamp, /// Number of executions tried. - executions: u32, + pub executions: u32, /// Number of retries to do. None = infinite. - max_retries: Option, + pub max_retries: Option, /// Strategy to determine time between retries. - retry_timing: RetryTiming, + pub retry_timing: RetryTiming, /// Whether or not the message is to be executed in ordered mode. Ordered /// messages are executed after other ordered messages, but unordered /// messages are executed immediately. - ordered: bool, + pub ordered: bool, /// Dependency, which needs to be finished before. References another /// message by ID. - execute_after: Option, + pub execute_after: Option, } /// Retry timing strategy. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum RetryTiming { /// Fixed timing between retries. Fixed(Duration), - /// Exponential back-off with maximum time + /// Exponential back-off with maximum time. Backoff { - /// Starting time in between retries + /// Initial time in between retries. initial: Duration, - /// Maximum time between retries + /// Maximum time between retries. maximum: Option, }, } /// The message queue's message payloads. -#[derive(Debug, Serialize, Deserialize, Collection)] -#[collection(name = "message_payloads", primary_key = Id)] +#[derive(Debug, Clone, Serialize, Deserialize, Collection)] +#[collection( + name = "message_payloads", + primary_key = Id, + natural_id = |payload: &MessagePayload| Some(payload.message_id) +)] pub struct MessagePayload { + /// The message ID. + pub message_id: Id, /// Message JSON payload. - payload_json: Option, + pub payload_json: Option, /// Message byte payload. - payload_bytes: Option>, + pub payload_bytes: Option>, } -/// Messages by their due time. +/// Messages by their due time. Reduces to the first message to be executed in +/// the key range. Use by key ranges ro receive all due messages. Reduce to +/// receive the first message to be executed, which can be used to sleep until +/// then. #[derive(Debug, Clone, View)] -#[view(collection = Message, key = Timestamp, value = u32, name = "due_messages")] +#[view(collection = Message, key = Timestamp, value = Option, name = "due_messages")] pub struct DueMessages; impl CollectionViewSchema for DueMessages { type View = Self; fn map(&self, document: CollectionDocument) -> ViewMapResult { - document.header.emit_key_and_value(document.contents.attempt_at, 1) + document + .header + .emit_key_and_value(document.contents.attempt_at, Some(document.contents.attempt_at)) } fn reduce( @@ -87,7 +108,7 @@ impl CollectionViewSchema for DueMessages { mappings: &[ViewMappedValue], _rereduce: bool, ) -> ReduceResult { - Ok(mappings.iter().map(|view| view.value).sum()) + Ok(mappings.iter().filter_map(|view| view.value).min()) } fn version(&self) -> u64 { @@ -95,8 +116,9 @@ impl CollectionViewSchema for DueMessages { } } -/// Latest Message that is in ordered mode and should be executed before a new -/// one. +/// Latest Message view that reduces to messages in ordered mode that should be +/// executed before a new one. This should be used to find the dependency for +/// new ordered messages. #[derive(Debug, Clone, View)] #[view(collection = Message, key = Timestamp, value = Option, name = "latest_message")] pub struct LatestMessage; @@ -131,3 +153,32 @@ impl CollectionViewSchema for LatestMessage { 0 } } + +impl RetryTiming { + /// Compute the next retry duration based on the number of executions + /// already done. So for the first retry (second execution), executions is + /// supposed to be zero. + #[must_use] + pub fn next_duration(&self, executions: u32) -> Duration { + match *self { + RetryTiming::Fixed(fixed) => fixed, + RetryTiming::Backoff { initial, maximum } => { + let duration = initial.saturating_mul(2_u32.saturating_pow(executions)); + if let Some(max) = maximum { + duration.min(max) + } else { + duration + } + } + } + } +} + +/// Generate a new ID. +pub(crate) fn generate_id() -> Result { + let mut buf = [0_u8; std::mem::size_of::()]; + getrandom::getrandom(&mut buf)?; + // SAFETY: Safe because we made sure it has the correct size using size_of. + let id = unsafe { std::mem::transmute(buf) }; + Ok(id) +} diff --git a/src/registry.rs b/src/registry.rs new file mode 100644 index 0000000..2605c98 --- /dev/null +++ b/src/registry.rs @@ -0,0 +1,115 @@ +//! Job registry for finding the code to execute based on incoming messages of +//! specified types. Allows to spawn new jobs / messages using +//! `JobRegistry::Handle.builder().spawn().await?`. + +use std::{future::Future, pin::Pin}; + +use crate::{spawn::JobBuilder, CurrentJob}; + +/// Function type of the jobs returned by the job registry. +pub type JobFunctionType = + Box Pin + Send>> + Send>; + +/// Functions the registry exposes. +pub trait JobRegister: Sized { + /// Spawn a new job/message using this builder. + fn builder(self) -> JobBuilder; + /// Return the message name of this message/job. + fn name(&self) -> &'static str; + /// Get the registry entry based on message name. Returns None if not found. + fn from_name(name: &str) -> Option; + /// Return the handler for this message/job. + fn function(&self) -> JobFunctionType; +} + +/// Creates a job registry with the given name as first parameter. The second +/// parameter is a named map of message names to functions, which are executed +/// for the message. +/// +/// Example: +/// ``` +/// # use bonsaimq::{job_registry, CurrentJob}; +/// async fn async_message_handler_fn(_job: CurrentJob) {} +/// +/// job_registry!(JobRegistry, { +/// Ident: "message_name" => async_message_handler_fn, +/// }); +/// ``` +#[macro_export] +macro_rules! job_registry { + ( + $reg_name:ident, + {$($msg_fn_name:ident: $msg_name:literal => $msg_fn:path),*$(,)?} + ) => { + #[doc = "Job Registry"] + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub enum $reg_name { + $( + #[doc = concat!("`", $msg_name, "` leading to `", stringify!($msg_fn), "`.")] + #[allow(non_camel_case_types)] + $msg_fn_name + ),* + } + + impl $crate::JobRegister for $reg_name { + #[inline] + fn builder(self) -> $crate::JobBuilder { + match self { + $(Self::$msg_fn_name => $crate::JobBuilder::new($msg_name)),* + } + } + + /// Return the message name of this message/job + #[inline] + fn name(&self) -> &'static str { + match *self { + $(Self::$msg_fn_name => $msg_name),* + } + } + + /// Get the registry entry based on message name. Returns None if not found. + #[inline] + fn from_name(name: &str) -> Option { + match name { + $($msg_name => Some(Self::$msg_fn_name)),*, + _ => None, + } + } + + /// Return the function of this message/job + #[inline] + fn function(&self) -> $crate::JobFunctionType { + match *self { + $(Self::$msg_fn_name => Box::new(|job| Box::pin($msg_fn(job)))),* + } + } + } + }; +} + +/// Test correct macro type referencing and implementation. See if it compiles. +#[cfg(test)] +mod tests { + #![allow(clippy::expect_used, unused_qualifications, clippy::unused_async)] + + use super::*; + use crate::job_registry; + + job_registry!(JobRegistry, { + some_fn: "cats" => some_fn, + OtherFn: "foxes" => self::some_other_fn, + }); + + async fn some_fn(_job: CurrentJob) {} + async fn some_other_fn(_job: CurrentJob) {} + + #[test] + fn test_job_registry() { + let name = JobRegistry::some_fn.name(); + assert_eq!(name, "cats"); + + let _function = JobRegistry::from_name("foxes").expect("name was set").function(); + + let _builder = JobRegistry::some_fn.builder(); + } +} diff --git a/src/runner.rs b/src/runner.rs new file mode 100644 index 0000000..af03e54 --- /dev/null +++ b/src/runner.rs @@ -0,0 +1,235 @@ +//! Connector to the database which runs code based on the messages and their +//! type. + +use std::{fmt::Debug, sync::Arc, time::Duration}; + +use bonsaidb::core::{ + async_trait::async_trait, + connection::AsyncConnection, + document::CollectionDocument, + pubsub::{AsyncPubSub, AsyncSubscriber}, + schema::{view::map::MappedDocuments, Collection, SerializedCollection}, + transaction::{Operation, Transaction}, + Error as BonsaiError, +}; +use time::OffsetDateTime; + +use crate::{ + queue::{DueMessages, Id, Message, MessagePayload, Timestamp, MQ_NOTIFY}, + AbortOnDropHandle, CurrentJob, Error, JobRegister, +}; + +/// Job Runner. This is the job execution system to be run in the background. It +/// runs on the specified database and using a specific job registry. +#[derive(Debug)] +pub struct JobRunner { + /// The database handle. + db: Arc, +} + +impl Clone for JobRunner { + fn clone(&self) -> Self { + Self { db: self.db.clone() } + } +} + +impl JobRunner +where + DB: AsyncConnection + AsyncPubSub + Debug + 'static, +{ + /// Create a new job runner on this database. + pub fn new(db: DB) -> Self { + Self { db: Arc::new(db) } + } + + /// Get messages that are due at the specified time. + async fn due_messages( + &self, + due_at: Timestamp, + ) -> Result, DueMessages>, BonsaiError> { + self.db.view::().with_key_range(..due_at).query_with_collection_docs().await + } + + /// Get the duration until the next message is due. + async fn next_message_due_in(&self, from: Timestamp) -> Result { + let nanos = self + .db + .view::() + .with_key_range(from..) + .reduce() + .await? + .map_or(10_000_000_000, |target| target - from); + let duration = Duration::from_nanos(nanos.clamp(0, u64::MAX.into()) as u64); + Ok(duration) + } + + /// Get the message payloads for the specified message (ID). + async fn message_payloads( + &self, + id: Id, + ) -> Result<(Option, Option>), BonsaiError> { + Ok(MessagePayload::get_async(id, self.db.as_ref()).await?.map_or((None, None), |payload| { + (payload.contents.payload_json, payload.contents.payload_bytes) + })) + } + + /// Spawn and run the daemon for processing messages/jobs in the background. + /// Keep this handle as long as you want jobs to be executed in the + /// background! You can also use and await the handle like normal + /// [`JoinHandle`](tokio::task::JoinHandle)s. + #[must_use] + pub fn run(self) -> AbortOnDropHandle> + where + REG: JobRegister + Send + Sync + 'static, + { + tokio::task::spawn(self.job_queue::()).into() + } + + /// Internal job queue runner. + #[tracing::instrument(level = "debug", skip_all, err)] + async fn job_queue(self) -> Result<(), Error> + where + REG: JobRegister + Send + Sync, + DB::Subscriber: AsyncSubscriber, + { + tracing::debug!("Running JobRunner.."); + let subscriber = self.db.create_subscriber().await?; + subscriber.subscribe_to(&MQ_NOTIFY).await?; + + loop { + // Retrieve due messages + let now = OffsetDateTime::now_utc().unix_timestamp_nanos(); + let messages = self.due_messages(now).await?; + tracing::trace!("Found {} due messages.", messages.len()); + + // Execute jobs for the messages + for msg in &messages { + if let Some(job) = REG::from_name(&msg.document.contents.name) { + // Filter out messages with active dependencies + if let Some(dependency) = msg.document.contents.execute_after { + if Message::get_async(dependency, self.db.as_ref()).await?.is_some() { + continue; + } + } + + // Keep alive, load payload annd start the job + let keep_alive = + CurrentJob::keep_alive(Arc::new(self.clone()), msg.document.contents.id) + .await?; + + let payloads = self.message_payloads(msg.document.contents.id).await?; + let current_job = CurrentJob { + id: msg.document.contents.id, + name: job.name(), + db: Arc::new(self.clone()), + payload_json: payloads.0, + payload_bytes: payloads.1, + keep_alive: Some(keep_alive.into()), + }; + + // TODO: Do something with the job handle? + let _jh = current_job.run(job.function()); + } else { + // TODO: Just silently ignore? + tracing::trace!( + "Job {} is not registered and will be ignored.", + msg.document.contents.name + ); + } + } + + // Sleep until the next message is due or a notification comes in. + let next_due_in = self.next_message_due_in(now).await?; + tokio::time::timeout(next_due_in, subscriber.receiver().receive_async()) + .await + .ok() // Timeout is not a failure + .transpose()?; + } + } +} + +/// JobRunner handle for the jobs. Workaround for putting the database into +/// CurrentJob, which requires generics.. Performs all the necessary database +/// access for the jobs. +#[async_trait] +pub(crate) trait JobRunnerHandle: Debug { + /// Complete the job with the specified ID. + async fn complete(&self, id: Id) -> Result<(), Error>; + /// Keep the job alive. Updates the job's database message to avoid multiple + /// concurrent executions. + async fn keep_alive(&self, id: Id) -> Result; + /// Job update function, that updates the job's database message for the + /// next retry after job execution. + async fn job_update(&self, id: Id) -> Result<(), Error>; +} + +#[async_trait] +impl JobRunnerHandle for JobRunner +where + DB: AsyncConnection + AsyncPubSub + 'static, +{ + #[tracing::instrument(level = "debug", skip(self))] + async fn complete(&self, id: Id) -> Result<(), Error> { + tracing::trace!("Completing job {id}."); + + let del_message = Message::get_async(id, self.db.as_ref()).await?.map(|msg| msg.header); + let del_payload = + MessagePayload::get_async(id, self.db.as_ref()).await?.map(|payload| payload.header); + + let mut tx = Transaction::new(); + if let Some(header) = del_message { + tx.push(Operation::delete(Message::collection_name(), header.try_into()?)); + } + if let Some(header) = del_payload { + tx.push(Operation::delete(MessagePayload::collection_name(), header.try_into()?)); + } + match tx.apply_async(self.db.as_ref()).await { + Err(BonsaiError::DocumentNotFound(_, _)) => {} + Err(err) => return Err(err.into()), + Ok(_) => {} + }; + + self.db.publish(&MQ_NOTIFY, &()).await?; + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn keep_alive(&self, id: Id) -> Result { + if let Some(mut message) = Message::get_async(id, self.db.as_ref()).await? { + tracing::trace!("Keeping job {id} alive."); + + let duration = message.contents.retry_timing.next_duration(message.contents.executions); + let now = OffsetDateTime::now_utc().unix_timestamp_nanos(); + + message.contents.attempt_at = now + Timestamp::try_from(duration.as_nanos())?; + message.update_async(self.db.as_ref()).await?; + + Ok(duration) + } else { + Ok(Duration::default()) + } + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn job_update(&self, id: Id) -> Result<(), Error> { + if let Some(mut message) = Message::get_async(id, self.db.as_ref()).await? { + tracing::trace!("Updating job {id} for retry."); + + if message.contents.max_retries.map_or(false, |max| message.contents.executions >= max) + { + return self.complete(id).await; + } + + let duration = message.contents.retry_timing.next_duration(message.contents.executions); + let now = OffsetDateTime::now_utc().unix_timestamp_nanos(); + message.contents.attempt_at = now + Timestamp::try_from(duration.as_nanos())?; + message.contents.executions += 1; + + message.update_async(self.db.as_ref()).await?; + } + Ok(()) + } + + // TODO: Checkpoint capability. +} diff --git a/src/spawn.rs b/src/spawn.rs new file mode 100644 index 0000000..413c5c4 --- /dev/null +++ b/src/spawn.rs @@ -0,0 +1,179 @@ +//! Job spawning capabilities. + +use std::time::Duration; + +use bonsaidb::core::{ + connection::AsyncConnection, + pubsub::AsyncPubSub, + transaction::{Operation, Transaction}, +}; +use serde::Serialize; +use time::OffsetDateTime; + +use crate::{ + queue::{ + generate_id, Id, LatestMessage, Message, MessagePayload, RetryTiming, Timestamp, MQ_NOTIFY, + }, + Error, +}; + +/// Builder for spawning a job. By default, `ordered` mode is off and infinite +/// retries with capped exponential backoff is used (1 second initially, +/// maximum 1 hour between tries). +#[derive(Debug, Clone)] +pub struct JobBuilder { + /// Message name/type. + name: &'static str, + /// Message ID. + id: Option, + /// Whether the job should be executed in ordered mode. + ordered: bool, + /// Initial execution delay. + delay: Option, + /// Maximum amount of retries. + max_retries: Option, + /// Retry timing, i.e. how much time should be in between job retries. + retry_timing: RetryTiming, + /// JSON payload. + payload_json: Option, + /// Byte payload. + payload_bytes: Option>, +} + +impl JobBuilder { + /// Create new [`JobBuilder`]. By default, `ordered` mode is off and + /// infinite retries with capped exponential backoff is used (1 second + /// initially, maximum 1 hour between tries). + #[must_use] + pub fn new(name: &'static str) -> Self { + Self { + name, + id: None, + ordered: false, + delay: None, + max_retries: None, + retry_timing: RetryTiming::Backoff { + initial: Duration::from_secs(1), + maximum: Some(Duration::from_secs(60 * 60)), + }, + payload_json: None, + payload_bytes: None, + } + } + + /// Set the message's ID. If not set, a new random one will be generated. + #[must_use] + #[inline] + pub fn id(mut self, id: Id) -> Self { + self.id = Some(id); + self + } + + /// Set whether ordered mode should be used. Ordered messages can only be + /// executed after the previous ordered message, but unordered messages caan + /// always be executed independently. + #[must_use] + #[inline] + pub fn ordered(mut self, ordered: bool) -> Self { + self.ordered = ordered; + self + } + + /// Set initial execution delay. + #[must_use] + #[inline] + pub fn delay(mut self, delay: impl Into>) -> Self { + self.delay = delay.into(); + self + } + + /// Set the maximum number of retries. None = infinite retrying. + #[must_use] + #[inline] + pub fn max_retries(mut self, max_retries: impl Into>) -> Self { + self.max_retries = max_retries.into(); + self + } + + /// Set the retry timing strategy. See [`RetryTiming`] for the possible + /// values. + #[must_use] + #[inline] + pub fn retry_timing(mut self, timing: RetryTiming) -> Self { + self.retry_timing = timing; + self + } + + /// Set JSON payload. If not set, there will be no JSON input to the job, + /// but there can still be byte data. The payloads are independent. + #[inline] + pub fn payload_json(mut self, payload: S) -> Result { + let value = serde_json::to_value(payload)?; + self.payload_json = Some(value); + Ok(self) + } + + /// Set byte payload. If not set, there will be no byte input to the job, + /// but there can still be JSON data. The payloads are independent. + #[must_use] + #[inline] + pub fn payload_bytes(mut self, payload: Vec) -> Self { + self.payload_bytes = Some(payload); + self + } + + /// Prepare the database entries. + async fn prepare_db_entries(self, db: &DB) -> Result<(Message, MessagePayload), Error> + where + DB: AsyncConnection, + { + let execute_after = + if self.ordered { db.view::().reduce().await? } else { None }; + + let id = self.id.map_or_else(generate_id, Ok)?; + + let now = OffsetDateTime::now_utc().unix_timestamp_nanos(); + let attempt_at = self + .delay + .map(|delay| Timestamp::try_from(delay.as_nanos())) + .transpose()? + .map_or(now, |delay| now + delay); + + let message = Message { + id, + name: self.name.to_owned(), + created_at: now, + attempt_at, + executions: 0, + max_retries: self.max_retries, + retry_timing: self.retry_timing, + ordered: self.ordered, + execute_after, + }; + let payload = MessagePayload { + message_id: id, + payload_json: self.payload_json, + payload_bytes: self.payload_bytes, + }; + + Ok((message, payload)) + } + + /// Spawn the job into the message queue on the database. + #[tracing::instrument(level = "debug", skip_all)] + pub async fn spawn(self, db: &DB) -> Result + where + DB: AsyncConnection + AsyncPubSub, + { + let (message, payload) = self.prepare_db_entries(db).await?; + Transaction::new() + .with(Operation::push_serialized::(&message)?) + .with(Operation::push_serialized::(&payload)?) + .apply_async(db) + .await?; + + db.publish(&MQ_NOTIFY, &()).await?; + + Ok(message.id) + } +} diff --git a/src/users.rs b/src/users.rs new file mode 100644 index 0000000..e61fab9 --- /dev/null +++ b/src/users.rs @@ -0,0 +1,31 @@ +//! Functions for the library users. + +use std::time::Duration; + +use bonsaidb::core::{connection::AsyncConnection, schema::SerializedCollection}; + +use crate::{ + queue::{Id, Message}, + Error, +}; + +/// Check whether the job with the given ID exists. Can also be used to check if +/// a job has finished already. +pub async fn job_exists(id: Id, db: &DB) -> Result { + Ok(Message::get_async(id, db).await?.is_some()) +} + +/// Wait for a job to finish using a fixed interval to check if it exists. +pub async fn await_job( + id: Id, + interval_ms: u64, + db: &DB, +) -> Result<(), Error> { + let mut int = tokio::time::interval(Duration::from_millis(interval_ms)); + loop { + if !job_exists(id, db).await? { + return Ok(()); + } + int.tick().await; + } +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..7455304 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,35 @@ +//! Utility and helper functionality for the library. + +use std::ops::{Deref, DerefMut}; + +use tokio::task::JoinHandle; + +/// [`JoinHandle`] that is aborted on [`Drop`]. +#[derive(Debug)] +pub struct AbortOnDropHandle(pub JoinHandle); + +impl Drop for AbortOnDropHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + +impl From> for AbortOnDropHandle { + fn from(jh: JoinHandle) -> Self { + Self(jh) + } +} + +impl Deref for AbortOnDropHandle { + type Target = JoinHandle; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for AbortOnDropHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 0000000..5d77744 --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,21 @@ +//! Common test functions + +use std::sync::Once; + +use tracing::level_filters::LevelFilter; +use tracing_subscriber::EnvFilter; + +/// Initialize test environment. +pub fn init() { + static SETUP: Once = Once::new(); + SETUP.call_once(|| { + color_eyre::install().expect("color-eyre"); + + let level_filter = LevelFilter::TRACE; + let filter = EnvFilter::from_default_env() + .add_directive(level_filter.into()) + .add_directive("want=warn".parse().expect("parse env filter")) + .add_directive("mio=warn".parse().expect("parse env filter")); + tracing_subscriber::fmt().with_test_writer().with_env_filter(filter).init(); + }); +} diff --git a/tests/general.rs b/tests/general.rs new file mode 100644 index 0000000..9dee954 --- /dev/null +++ b/tests/general.rs @@ -0,0 +1,74 @@ +//! General tests and simple cases. +#![allow(clippy::expect_used, clippy::unused_async)] + +mod common; + +use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, +}; + +use bonsaidb::local::{ + config::{Builder, StorageConfiguration}, + AsyncDatabase, +}; +use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema}; +use color_eyre::Result; + +/// Counter +static COUNT: AtomicUsize = AtomicUsize::new(0); + +/// Counting handler that does not complete the job, so should be retried. +async fn counter(_job: CurrentJob) { + COUNT.fetch_add(1, Ordering::SeqCst); +} + +job_registry!(JobRegistry, { + Counter: "counter" => counter, +}); + +#[tokio::test] +#[ntest::timeout(10000)] +async fn same_id() -> Result<()> { + common::init(); + + let db_path = "same-id-test.bonsaidb"; + tokio::fs::remove_dir_all(db_path).await.ok(); + let db = AsyncDatabase::open::(StorageConfiguration::new(db_path)).await?; + + let id = JobRegistry::Counter.builder().delay(Duration::from_secs(999)).spawn(&db).await?; + let res = JobRegistry::Counter.builder().id(id).spawn(&db).await; + + assert!(res.is_err()); + + tokio::fs::remove_dir_all(db_path).await?; + Ok(()) +} + +#[tokio::test] +#[ntest::timeout(30000)] +async fn retrying() -> Result<()> { + common::init(); + + let db_path = "retrying-test.bonsaidb"; + tokio::fs::remove_dir_all(db_path).await.ok(); + let db = AsyncDatabase::open::(StorageConfiguration::new(db_path)).await?; + let job_runner = JobRunner::new(db.clone()).run::(); + + let n = 4; + let id = JobRegistry::Counter + .builder() + .id(123_456_789) + .max_retries(n) + .retry_timing(bonsaimq::RetryTiming::Fixed(Duration::from_millis(10))) + .spawn(&db) + .await?; + + bonsaimq::await_job(id, 100, &db).await?; + + assert_eq!(COUNT.load(Ordering::SeqCst), n as usize + 1); + + job_runner.abort(); + tokio::fs::remove_dir_all(db_path).await?; + Ok(()) +} diff --git a/tests/ordering.rs b/tests/ordering.rs new file mode 100644 index 0000000..4587cb3 --- /dev/null +++ b/tests/ordering.rs @@ -0,0 +1,58 @@ +//! Testing that job order works as expected. +#![allow(clippy::expect_used)] + +mod common; + +use std::sync::atomic::{AtomicUsize, Ordering}; + +use bonsaidb::local::{ + config::{Builder, StorageConfiguration}, + AsyncDatabase, +}; +use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema}; +use color_eyre::Result; + +/// Counter +static COUNT: AtomicUsize = AtomicUsize::new(0); + +/// Ordered counting handler. +async fn counter(mut job: CurrentJob) { + let expected = job.payload_bytes().expect("byte payload")[0]; + let nr = COUNT.fetch_add(1, Ordering::SeqCst); + assert_eq!(nr, expected as usize); + job.complete().await.expect("completing job"); +} + +job_registry!(JobRegistry, { + Counter: "counter" => counter, +}); + +#[tokio::test] +#[ntest::timeout(60000)] +async fn job_order() -> Result<()> { + common::init(); + + let db_path = "ordering-test.bonsaidb"; + tokio::fs::remove_dir_all(db_path).await.ok(); + let db = AsyncDatabase::open::(StorageConfiguration::new(db_path)).await?; + let job_runner = JobRunner::new(db.clone()).run::(); + + let n = 100_u8; + let mut jobs = Vec::with_capacity(n as usize); + for i in 0..n { + let job_id = JobRegistry::Counter.builder().payload_bytes(vec![i]).spawn(&db).await?; + jobs.push(job_id); + } + + // Wait for jobs to finish + for job_id in jobs { + bonsaimq::await_job(job_id, 100, &db).await?; + } + + let value = COUNT.load(Ordering::SeqCst); + assert_eq!(value, n as usize); + + job_runner.abort(); + tokio::fs::remove_dir_all(db_path).await?; + Ok(()) +} diff --git a/tests/stress.rs b/tests/stress.rs new file mode 100644 index 0000000..8fa5a0f --- /dev/null +++ b/tests/stress.rs @@ -0,0 +1,60 @@ +//! Stress-test, making sure everything still works as expected +#![allow(clippy::expect_used)] + +mod common; + +use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, +}; + +use bonsaidb::local::{ + config::{Builder, StorageConfiguration}, + AsyncDatabase, +}; +use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema}; +use color_eyre::Result; + +/// Counter +static COUNT: AtomicUsize = AtomicUsize::new(0); + +/// Counting handler. +async fn counter(mut job: CurrentJob) { + COUNT.fetch_add(1, Ordering::SeqCst); + job.complete().await.expect("completing job"); +} + +job_registry!(JobRegistry, { + Counter: "counter" => counter, +}); + +#[tokio::test] +#[ntest::timeout(60000)] +async fn stress_test() -> Result<()> { + common::init(); + + let db_path = "stress-test.bonsaidb"; + tokio::fs::remove_dir_all(db_path).await.ok(); + let db = AsyncDatabase::open::(StorageConfiguration::new(db_path)).await?; + let job_runner = JobRunner::new(db.clone()).run::(); + + let n = 100; + let mut jobs = Vec::with_capacity(n); + for _ in 0..n { + let job_id = + JobRegistry::Counter.builder().delay(Duration::from_millis(500)).spawn(&db).await?; + jobs.push(job_id); + } + + // Wait for jobs to finish + for job_id in jobs { + bonsaimq::await_job(job_id, 100, &db).await?; + } + + let value = COUNT.load(Ordering::SeqCst); + assert_eq!(value, n); + + job_runner.abort(); + tokio::fs::remove_dir_all(db_path).await?; + Ok(()) +}