From ec0470cddc270a9753f6a9d321ee5d06092562a2 Mon Sep 17 00:00:00 2001 From: Github Automation Date: Thu, 18 Apr 2024 17:14:22 -0600 Subject: [PATCH] chore: testing with loom and poll --- Cargo.lock | 385 ++++++++--------- Cargo.toml | 8 +- Makefile | 5 +- p2p/Cargo.toml | 5 + p2p/src/behaviour.rs | 150 +++++++ p2p/src/behaviour/ceramic_peer_manager.rs | 61 ++- recon/Cargo.toml | 14 +- recon/src/lib.rs | 12 +- recon/src/libp2p.rs | 40 +- recon/src/libp2p/tests.rs | 408 +++++++----------- recon/src/protocol.rs | 3 +- recon/src/recon.rs | 13 +- recon/src/recon/tests.rs | 8 +- recon/src/sha256a.rs | 2 +- recon/src/{recon => test_utils}/btreestore.rs | 8 +- recon/src/test_utils/mock_or_real.rs | 279 ++++++++++++ recon/src/test_utils/mocks.rs | 278 ++++++++++++ recon/src/{tests.rs => test_utils/mod.rs} | 32 +- recon/src/test_utils/test_behaviour.rs | 325 ++++++++++++++ recon/src/test_utils/test_swarm.rs | 197 +++++++++ store/src/metrics.rs | 4 +- store/src/sql/event.rs | 4 +- store/src/sql/interest.rs | 7 +- 23 files changed, 1746 insertions(+), 502 deletions(-) rename recon/src/{recon => test_utils}/btreestore.rs (96%) create mode 100644 recon/src/test_utils/mock_or_real.rs create mode 100644 recon/src/test_utils/mocks.rs rename recon/src/{tests.rs => test_utils/mod.rs} (79%) create mode 100644 recon/src/test_utils/test_behaviour.rs create mode 100644 recon/src/test_utils/test_swarm.rs diff --git a/Cargo.lock b/Cargo.lock index d61f38f35..540b23d09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,7 +24,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" dependencies = [ "crypto-common", - "generic-array 0.14.7", + "generic-array", ] [[package]] @@ -290,7 +290,7 @@ checksum = "136d4d23bcc79e27423727b36823d86233aad06dfea531837b038394d11e9928" dependencies = [ "concurrent-queue", "event-listener 5.3.0", - "event-listener-strategy 0.5.1", + "event-listener-strategy 0.5.2", "futures-core", "pin-project-lite", ] @@ -303,7 +303,7 @@ checksum = "b10202063978b3351199d68f8b22c4e47e4b1b822f8d43fd862d5ea8c006b29a" dependencies = [ "async-task", "concurrent-queue", - "fastrand 2.0.2", + "fastrand 2.1.0", "futures-lite 2.3.0", "slab", ] @@ -491,9 +491,9 @@ dependencies = [ [[package]] name = "async-task" -version = "4.7.0" +version = "4.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" @@ -828,25 +828,13 @@ dependencies = [ "constant_time_eq 0.3.0", ] -[[package]] -name = "block-buffer" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" -dependencies = [ - "block-padding", - "byte-tools", - "byteorder", - "generic-array 0.12.4", -] - [[package]] name = "block-buffer" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" dependencies = [ - "generic-array 0.14.7", + "generic-array", ] [[package]] @@ -855,39 +843,28 @@ version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" dependencies = [ - "generic-array 0.14.7", -] - -[[package]] -name = "block-padding" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5" -dependencies = [ - "byte-tools", + "generic-array", ] [[package]] name = "blocking" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" +checksum = "495f7104e962b7356f0aeb34247aca1fe7d2e783b346582db7f2904cb5717e88" dependencies = [ "async-channel 2.2.1", "async-lock 3.3.0", "async-task", - "fastrand 2.0.2", "futures-io", "futures-lite 2.3.0", "piper", - "tracing", ] [[package]] name = "bpaf" -version = "0.9.11" +version = "0.9.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567fc5f0a754100df11b167b2a247b2366fc1ac18e9b776a07659be00878f681" +checksum = "3280efcf6d66bc77c2cf9b67dc8acee47a217d9be67dd590b3230dffe663724d" [[package]] name = "bs58" @@ -923,12 +900,6 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" -[[package]] -name = "byte-tools" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" - [[package]] name = "byteorder" version = "1.5.0" @@ -1043,9 +1014,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.95" +version = "1.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b" +checksum = "065a29261d53ba54260972629f9ca6bffa69bac13cd1fed61420f7fa68b9f8bd" dependencies = [ "jobserver", "libc", @@ -1331,7 +1302,9 @@ dependencies = [ "iroh-rpc-types", "libp2p", "libp2p-identity", + "loom", "lru 0.10.1", + "mockall", "prometheus-client", "rand 0.8.5", "rand_chacha 0.3.1", @@ -1585,9 +1558,9 @@ checksum = "337cdbf3f1a0e643b4a7d1a2ffa39d22342fb6ee25739b5cfb997c28b3586422" [[package]] name = "concurrent-queue" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ "crossbeam-utils", ] @@ -1801,7 +1774,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03c6a1d5fa1de37e071642dfa44ec552ca5b299adb128fab16138e24b548fd21" dependencies = [ - "generic-array 0.14.7", + "generic-array", "subtle", ] @@ -1811,7 +1784,7 @@ version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" dependencies = [ - "generic-array 0.14.7", + "generic-array", "rand_core 0.6.4", "subtle", "zeroize", @@ -1823,7 +1796,7 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" dependencies = [ - "generic-array 0.14.7", + "generic-array", "rand_core 0.6.4", "subtle", "zeroize", @@ -1835,7 +1808,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ - "generic-array 0.14.7", + "generic-array", "rand_core 0.6.4", "typenum", ] @@ -2012,15 +1985,15 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "data-encoding-macro" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20c01c06f5f429efdf2bae21eb67c28b3df3cf85b7dd2d8ef09c0838dac5d33e" +checksum = "f1559b6cba622276d6d63706db152618eeb15b89b3e4041446b05876e352e639" dependencies = [ "data-encoding", "data-encoding-macro-internal", @@ -2028,9 +2001,9 @@ dependencies = [ [[package]] name = "data-encoding-macro-internal" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0047d07f2c89b17dd631c80450d69841a6b5d7fb17278cbc43d7e4cfcf2576f3" +checksum = "332d754c0af53bc87c108fed664d121ecf59207ec4196041f04d6ab9002ad33f" dependencies = [ "data-encoding", "syn 1.0.109", @@ -2196,28 +2169,13 @@ dependencies = [ "static-iref", ] -[[package]] -name = "difflib" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" - -[[package]] -name = "digest" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" -dependencies = [ - "generic-array 0.12.4", -] - [[package]] name = "digest" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" dependencies = [ - "generic-array 0.14.7", + "generic-array", ] [[package]] @@ -2403,7 +2361,7 @@ dependencies = [ "der 0.6.1", "digest 0.10.7", "ff 0.12.1", - "generic-array 0.14.7", + "generic-array", "group 0.12.1", "rand_core 0.6.4", "sec1 0.3.0", @@ -2421,7 +2379,7 @@ dependencies = [ "crypto-bigint 0.5.5", "digest 0.10.7", "ff 0.13.0", - "generic-array 0.14.7", + "generic-array", "group 0.13.0", "pem-rfc7468 0.7.0", "pkcs8 0.10.2", @@ -2562,9 +2520,9 @@ dependencies = [ [[package]] name = "event-listener-strategy" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "332f51cb23d20b0de8458b86580878211da09bcd4503cb579c225b3d124cabb3" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" dependencies = [ "event-listener 5.3.0", "pin-project-lite", @@ -2580,12 +2538,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "fake-simd" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" - [[package]] name = "fastrand" version = "1.9.0" @@ -2597,9 +2549,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "ff" @@ -2650,23 +2602,14 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flate2" -version = "1.0.28" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" dependencies = [ "crc32fast", "miniz_oxide", ] -[[package]] -name = "float-cmp" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" -dependencies = [ - "num-traits", -] - [[package]] name = "flume" version = "0.10.14" @@ -2824,7 +2767,7 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" dependencies = [ - "fastrand 2.0.2", + "fastrand 2.1.0", "futures-core", "futures-io", "parking", @@ -2900,12 +2843,17 @@ dependencies = [ ] [[package]] -name = "generic-array" -version = "0.12.4" +name = "generator" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd" +checksum = "186014d53bc231d0090ef8d6f03e0920c54d85a5ed22f4f2f74315ec56cf83fb" dependencies = [ - "typenum", + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows 0.54.0", ] [[package]] @@ -2949,7 +2897,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1" dependencies = [ - "opaque-debug 0.3.1", + "opaque-debug", "polyval", ] @@ -3111,9 +3059,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.3" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash 0.8.11", "allocator-api2", @@ -3125,7 +3073,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -3200,7 +3148,7 @@ dependencies = [ "ipnet", "once_cell", "rand 0.8.5", - "socket2 0.5.6", + "socket2 0.5.7", "thiserror", "tinyvec", "tokio", @@ -3333,7 +3281,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.6", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -3425,7 +3373,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -3490,7 +3438,7 @@ dependencies = [ "smol", "system-configuration", "tokio", - "windows", + "windows 0.51.1", ] [[package]] @@ -3536,7 +3484,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -3545,7 +3493,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" dependencies = [ - "generic-array 0.14.7", + "generic-array", ] [[package]] @@ -3614,7 +3562,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2 0.5.6", + "socket2 0.5.7", "widestring", "windows-sys 0.48.0", "winreg", @@ -4192,9 +4140,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.153" +version = "0.2.154" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" [[package]] name = "libgit2-sys" @@ -4569,7 +4517,7 @@ dependencies = [ "libp2p-swarm", "rand 0.8.5", "smallvec", - "socket2 0.5.6", + "socket2 0.5.7", "tokio", "tracing", "void", @@ -4674,7 +4622,7 @@ dependencies = [ "rand 0.8.5", "ring 0.16.20", "rustls", - "socket2 0.5.6", + "socket2 0.5.7", "thiserror", "tokio", "tracing", @@ -4793,7 +4741,7 @@ dependencies = [ "libc", "libp2p-core", "libp2p-identity", - "socket2 0.5.6", + "socket2 0.5.7", "tokio", "tracing", ] @@ -4865,7 +4813,7 @@ dependencies = [ "thiserror", "tracing", "yamux 0.12.1", - "yamux 0.13.1", + "yamux 0.13.2", ] [[package]] @@ -4930,9 +4878,9 @@ checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" dependencies = [ "autocfg", "scopeguard", @@ -4969,6 +4917,20 @@ dependencies = [ "value-bag", ] +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "pin-utils", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lru" version = "0.10.1" @@ -4984,7 +4946,7 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" dependencies = [ - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -4993,7 +4955,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" dependencies = [ - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -5106,9 +5068,9 @@ dependencies = [ [[package]] name = "mockall" -version = "0.11.4" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" +checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48" dependencies = [ "cfg-if", "downcast", @@ -5121,14 +5083,14 @@ dependencies = [ [[package]] name = "mockall_derive" -version = "0.11.4" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" +checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2" dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.60", ] [[package]] @@ -5435,12 +5397,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "normalize-line-endings" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" - [[package]] name = "ntapi" version = "0.4.1" @@ -5631,12 +5587,6 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" -[[package]] -name = "opaque-debug" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" - [[package]] name = "opaque-debug" version = "0.3.1" @@ -5824,9 +5774,9 @@ checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" dependencies = [ "lock_api", "parking_lot_core", @@ -5834,15 +5784,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.1", "smallvec", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] @@ -5979,7 +5929,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" dependencies = [ "atomic-waker", - "fastrand 2.0.2", + "fastrand 2.1.0", "futures-io", ] @@ -6126,7 +6076,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf" dependencies = [ "cpufeatures", - "opaque-debug 0.3.1", + "opaque-debug", "universal-hash", ] @@ -6138,7 +6088,7 @@ checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25" dependencies = [ "cfg-if", "cpufeatures", - "opaque-debug 0.3.1", + "opaque-debug", "universal-hash", ] @@ -6162,16 +6112,12 @@ checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" [[package]] name = "predicates" -version = "2.1.5" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" +checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8" dependencies = [ - "difflib", - "float-cmp", - "itertools 0.10.5", - "normalize-line-endings", + "anstyle", "predicates-core", - "regex", ] [[package]] @@ -6509,7 +6455,7 @@ checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7" dependencies = [ "bytes 1.6.0", "libc", - "socket2 0.5.6", + "socket2 0.5.7", "tracing", "windows-sys 0.48.0", ] @@ -6663,6 +6609,8 @@ dependencies = [ "libp2p", "libp2p-identity", "libp2p-swarm-test", + "loom", + "mockall", "multihash-codetable", "pin-project", "pretty", @@ -6693,6 +6641,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "redox_users" version = "0.4.5" @@ -6867,7 +6824,7 @@ checksum = "2eca4ecc81b7f313189bf73ce724400a07da2a6dac19588b03c8bd76a2dcc251" dependencies = [ "block-buffer 0.9.0", "digest 0.9.0", - "opaque-debug 0.3.1", + "opaque-debug", ] [[package]] @@ -7006,9 +6963,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.11" +version = "0.21.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring 0.17.8", @@ -7094,6 +7051,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -7118,7 +7081,7 @@ checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" dependencies = [ "base16ct 0.1.1", "der 0.6.1", - "generic-array 0.14.7", + "generic-array", "pkcs8 0.9.0", "subtle", "zeroize", @@ -7132,7 +7095,7 @@ checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc" dependencies = [ "base16ct 0.2.0", "der 0.7.9", - "generic-array 0.14.7", + "generic-array", "pkcs8 0.10.2", "subtle", "zeroize", @@ -7172,9 +7135,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.198" +version = "1.0.199" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" +checksum = "0c9f6e76df036c77cd94996771fb40db98187f096dd0b9af39c6c6e452ba966a" dependencies = [ "serde_derive", ] @@ -7218,9 +7181,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.198" +version = "1.0.199" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" +checksum = "11bd257a6541e141e42ca6d24ae26f7714887b47e89aa739099104c7e4d3b7fc" dependencies = [ "proc-macro2", "quote", @@ -7238,9 +7201,9 @@ dependencies = [ [[package]] name = "serde_ipld_dagcbor" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb1eedfc9e48051a90d79e189dea2303b7c0df82f03e154ae85bf2ceea957972" +checksum = "ded35fbe4ab8fdec1f1d14b4daff2206b1eada4d6e708cb451d464d2d965f493" dependencies = [ "cbor4ii", "ipld-core", @@ -7363,7 +7326,7 @@ dependencies = [ "cfg-if", "cpufeatures", "digest 0.9.0", - "opaque-debug 0.3.1", + "opaque-debug", ] [[package]] @@ -7377,18 +7340,6 @@ dependencies = [ "digest 0.10.7", ] -[[package]] -name = "sha2" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a256f46ea78a0c0d9ff00077504903ac881a1dafdc20da66545699e7776b3e69" -dependencies = [ - "block-buffer 0.7.3", - "digest 0.8.1", - "fake-simd", - "opaque-debug 0.2.3", -] - [[package]] name = "sha2" version = "0.9.9" @@ -7399,7 +7350,7 @@ dependencies = [ "cfg-if", "cpufeatures", "digest 0.9.0", - "opaque-debug 0.3.1", + "opaque-debug", ] [[package]] @@ -7607,9 +7558,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" dependencies = [ "libc", "windows-sys 0.52.0", @@ -7795,7 +7746,7 @@ dependencies = [ "futures-core", "futures-io", "futures-util", - "generic-array 0.14.7", + "generic-array", "hex", "hkdf", "hmac", @@ -7910,13 +7861,13 @@ dependencies = [ [[package]] name = "sshkeys" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c926cb006a77964474a13a86aa0135ea82c9fd43e6793a1151cc54143db6637c" +checksum = "45287473d24bf7ad9ebad1aff097ad0424c16cd9430549170c3a67c5b05705bd" dependencies = [ - "base64 0.12.3", + "base64 0.22.0", "byteorder", - "sha2 0.8.2", + "sha2 0.10.8", ] [[package]] @@ -8428,7 +8379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", - "fastrand 2.0.2", + "fastrand 2.1.0", "rustix 0.38.34", "windows-sys 0.52.0", ] @@ -8461,9 +8412,9 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "test-log" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b319995299c65d522680decf80f2c108d85b861d81dfe340a10d16cee29d9e6" +checksum = "3dffced63c2b5c7be278154d76b479f9f9920ed34e7574201407f0b14e2bbb93" dependencies = [ "test-log-macros", "tracing-subscriber", @@ -8471,9 +8422,9 @@ dependencies = [ [[package]] name = "test-log-macros" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8f546451eaa38373f549093fe9fd05e7d2bade739e2ddf834b9968621d60107" +checksum = "5999e24eaa32083191ba4e425deb75cdf25efefabe5aaccb7446dd0d4122a3f5" dependencies = [ "proc-macro2", "quote", @@ -8665,7 +8616,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.6", + "socket2 0.5.7", "tokio-macros", "tracing", "windows-sys 0.48.0", @@ -8778,7 +8729,7 @@ dependencies = [ "futures-io", "futures-sink", "futures-util", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "pin-project-lite", "tokio", "tracing", @@ -9088,9 +9039,9 @@ checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" [[package]] name = "unicode-width" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" +checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" [[package]] name = "unicode-xid" @@ -9431,7 +9382,7 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" dependencies = [ - "redox_syscall", + "redox_syscall 0.4.1", "wasite", "web-sys", ] @@ -9479,10 +9430,20 @@ version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9" dependencies = [ - "windows-core", + "windows-core 0.51.1", "windows-targets 0.48.5", ] +[[package]] +name = "windows" +version = "0.54.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9252e5725dbed82865af151df558e754e4a3c2c30818359eb17465f1346a1b49" +dependencies = [ + "windows-core 0.54.0", + "windows-targets 0.52.5", +] + [[package]] name = "windows-core" version = "0.51.1" @@ -9492,6 +9453,34 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-core" +version = "0.54.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12661b9c89351d684a50a8a643ce5f608e20243b9fb84687800163429f161d65" +dependencies = [ + "windows-result", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-result" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "749f0da9cc72d82e600d8d2e44cadd0b9eedb9038f71a1c58556ac1c5791813b" +dependencies = [ + "windows-targets 0.52.5", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -9717,9 +9706,9 @@ dependencies = [ [[package]] name = "yamux" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1d0148b89300047e72994bee99ecdabd15a9166a7b70c8b8c37c314dcc9002" +checksum = "5f97202f6b125031b95d83e01dc57292b529384f80bfae4677e4bbc10178cf72" dependencies = [ "futures", "instant", diff --git a/Cargo.toml b/Cargo.toml index 68488df8f..d0a8cf549 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ ceramic-store = { path = "./store" } cid = { version = "0.11", features = ["serde-codec"] } clap = { version = "4", features = ["derive", "env"] } clap_mangen = "0.2.2" +concurrent-queue = "2.4.0" console = { version = "0.15", default-features = false } console-subscriber = "0.2" criterion2 = "0.7.0" @@ -70,6 +71,7 @@ deadqueue = "0.2.3" derivative = "2.2" derive_more = "0.99.17" dirs-next = "2" +event-listener = "4.0.3" expect-test = "1.4.1" fastmurmur3 = "0.1.2" fnv = "1.0.7" @@ -101,12 +103,14 @@ keyed_priority_queue = "0.4.1" lazy_static = "1.4" libp2p = { version = "0.53", default-features = false } libp2p-identity = { version = "0.2", features = ["peerid", "ed25519"] } +libp2p-swarm-test = { version = "0.3.0" } +loom = { version = "0.7.1", features = ["futures"] } lru = "0.10" mime = "0.3" mime_classifier = "0.0.1" mime_guess = "2.0.4" minicbor = { version = "0.19.1", features = ["std", "derive", "half"] } -mockall = "0.11.4" +mockall = "0.12.1" multiaddr = "0.18" multibase = "0.9" multihash = { version = "0.19" } @@ -130,7 +134,7 @@ quic-rpc = { version = "0.3.2", default-features = false } rand = "0.8.5" rand_chacha = "0.3.1" rayon = "1.5.3" -recon = { path = "./recon/" } +recon = { path = "./recon" } regex = "1.7.1" relative-path = "1.7.2" reqwest = { version = "0.11.10", default-features = false } diff --git a/Makefile b/Makefile index 64cb495cc..68727640f 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ RUSTFLAGS = -D warnings --cfg tokio_unstable CARGO = RUSTFLAGS='${RUSTFLAGS}' cargo +CARGO_LOOM = RUSTFLAGS='${RUSTFLAGS} --cfg loomtest' cargo RELEASE_LEVEL ?= minor @@ -83,9 +84,9 @@ release-pr: .PHONY: test test: # Test with default features - $(CARGO) test --locked --release + $(CARGO_LOOM) test --locked --release # Test with all features - $(CARGO) test --locked --release --all-features + $(CARGO_LOOM) test --locked --release --all-features .PHONY: check-fmt check-fmt: diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 09dbb74e9..ac05a23b3 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -69,6 +69,11 @@ criterion2.workspace = true rand_chacha.workspace = true test-log.workspace = true ceramic-store.workspace = true +mockall.workspace = true +recon = { workspace = true, features = ["test-utils"]} + +[target.'cfg(loomtest)'.dependencies] +loom.workspace = true [[bench]] name = "lru_cache" diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index b14eedefc..07620c162 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -222,3 +222,153 @@ where Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use cid::Cid; + use libp2p::swarm::{ConnectionId, ToSwarm}; + use mockall::mock; + use prometheus_client::registry::Registry; + use recon::test_utils::{ + InjectedEvent, MockReconForEventId, MockReconForInterest, TestBehaviour, TestSwarm, + }; + + fn convert_behaviour_event(ev: Event) -> Option { + match ev { + Event::Ping(_ev) => None, + Event::Identify(_ev) => None, + Event::Kademlia(_ev) => None, + Event::Mdns(_ev) => None, + Event::Bitswap(_ev) => None, + Event::Autonat(_ev) => None, + Event::Relay(_ev) => None, + Event::RelayClient(_ev) => None, + Event::Dcutr(_ev) => None, + Event::PeerManager(_ev) => None, + Event::Recon(ev) => Some(ev), + Event::Void => None, + } + } + + mock! { + BitswapStore {} + + #[async_trait::async_trait] + impl iroh_bitswap::Store for BitswapStore { + async fn get_size(&self, cid: &Cid) -> Result; + async fn get(&self, cid: &Cid) -> Result; + async fn has(&self, cid: &Cid) -> Result; + async fn put(&self, block: &Block) -> Result; + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn behavior_runs() { + let mut cfg = Libp2pConfig::default(); + cfg.autonat = false; + cfg.bitswap_server = false; + cfg.bitswap_client = false; + cfg.mdns = false; + cfg.relay_client = false; + cfg.relay_server = false; + let behavior = NodeBehaviour::new( + &Keypair::generate_ed25519(), + &cfg, + None, + Some(( + MockReconForInterest::default(), + MockReconForEventId::default(), + )), + Arc::new(MockBitswapStore::default()), + Metrics::register(&mut Registry::default()), + ) + .await + .unwrap(); + let swarm = TestSwarm::from_behaviour(TestBehaviour { + inner: behavior, + convert: Box::new(|_, ev| { + if let ToSwarm::GenerateEvent(ev) = ev { + if let Some(ev) = convert_behaviour_event(ev) { + Some(ToSwarm::GenerateEvent(ev)) + } else { + None + } + } else { + None + } + }), + api: Box::new(|_, _| ()), + }); + let driver = swarm.drive(); + + tokio::time::sleep(Duration::from_secs(1)).await; + + let stats = driver.stop().await; + assert!(stats.polled >= 1); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn behavior_polled_when_using_public_api() { + let mut mock_interest = MockReconForInterest::default(); + mock_interest + .expect_clone() + .returning(|| MockReconForInterest::default()); + let mut mock_event = MockReconForEventId::default(); + mock_event + .expect_clone() + .returning(|| MockReconForEventId::default()); + let mut cfg = Libp2pConfig::default(); + cfg.autonat = false; + cfg.bitswap_server = false; + cfg.bitswap_client = false; + cfg.mdns = false; + cfg.relay_client = false; + cfg.relay_server = false; + let behavior = NodeBehaviour::new( + &Keypair::generate_ed25519(), + &cfg, + None, + Some((mock_interest, mock_event)), + Arc::new(MockBitswapStore::default()), + Metrics::register(&mut Registry::default()), + ) + .await + .unwrap(); + let swarm = TestSwarm::from_behaviour(TestBehaviour { + inner: behavior, + convert: Box::new(|_, ev| { + if let ToSwarm::GenerateEvent(ev) = ev { + if let Some(ev) = convert_behaviour_event(ev) { + Some(ToSwarm::GenerateEvent(ev)) + } else { + None + } + } else { + None + } + }), + api: Box::new(|beh, _| { + beh.handle_established_inbound_connection( + ConnectionId::new_unchecked(0), + recon::test_utils::PEER_ID.parse().unwrap(), + &"/ip4/1.2.3.4/tcp/443".parse().unwrap(), + &"/ip4/1.2.3.4/tcp/443".parse().unwrap(), + ) + .unwrap(); + }), + }); + let driver = swarm.drive(); + + tokio::time::sleep(Duration::from_secs(1)).await; + + driver + .inject(InjectedEvent::Api("connect".to_string())) + .await; + + tokio::time::sleep(Duration::from_secs(1)).await; + + let stats = driver.stop().await; + assert!(stats.polled >= 2); + } +} diff --git a/p2p/src/behaviour/ceramic_peer_manager.rs b/p2p/src/behaviour/ceramic_peer_manager.rs index 854a58299..ae00f5ba7 100644 --- a/p2p/src/behaviour/ceramic_peer_manager.rs +++ b/p2p/src/behaviour/ceramic_peer_manager.rs @@ -188,7 +188,7 @@ impl NetworkBehaviour for CeramicPeerManager { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { for (peer_id, peer) in self.ceramic_peers.iter_mut() { if let Some(mut dial_future) = peer.dial_future.take() { match dial_future.as_mut().poll_unpin(cx) { @@ -290,3 +290,62 @@ impl CeramicPeer { } } } + +#[cfg(test)] +mod tests { + use super::*; + use libp2p::core::{ConnectedPoint, Endpoint}; + use prometheus_client::registry::Registry; + use recon::test_utils::*; + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn swarm_should_drive_poll() { + let metrics = Metrics::register(&mut Registry::default()); + let beh = CeramicPeerManager::new(&[], metrics).unwrap(); + let swarm = TestSwarm::from_behaviour(TestBehaviour { + inner: beh, + convert: Box::new(|_, _ev| None), + api: Box::new(|_, _| ()), + }); + + let driver = swarm.drive(); + + tokio::time::sleep(Duration::from_secs(1)).await; + + let stats = driver.stop().await; + + assert!(stats.polled >= 1); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn swarm_should_not_poll_if_event_injected() { + let metrics = Metrics::register(&mut Registry::default()); + let beh = CeramicPeerManager::new(&[], metrics).unwrap(); + let swarm = TestSwarm::from_behaviour(TestBehaviour { + inner: beh, + convert: Box::new(|_, _ev| None), + api: Box::new(|_, _| ()), + }); + + let driver = swarm.drive(); + + let conn_id = ConnectionId::new_unchecked(0); + driver + .inject(InjectedEvent::InboundConnection( + PeerId::random(), + conn_id.clone(), + ConnectedPoint::Dialer { + address: "/ip4/1.2.3.4/tcp/443".parse().unwrap(), + role_override: Endpoint::Dialer, + }, + )) + .await; + + tokio::time::sleep(Duration::from_secs(1)).await; + + let stats = driver.stop().await; + + assert!(stats.polled >= 1); + assert_eq!(*stats.handler_polled.get(&conn_id).unwrap(), 0); + } +} diff --git a/recon/Cargo.toml b/recon/Cargo.toml index e1acc3516..5795770b4 100644 --- a/recon/Cargo.toml +++ b/recon/Cargo.toml @@ -17,7 +17,9 @@ ceramic-metrics.workspace = true futures.workspace = true hex = "0.4.3" libp2p-identity.workspace = true -libp2p.workspace = true +libp2p = { workspace = true, features = ["macros"] } +libp2p-swarm-test = { workspace = true, optional = true } +mockall = { workspace = true, optional = true } multihash-codetable.workspace = true prometheus-client.workspace = true serde.workspace = true @@ -33,7 +35,8 @@ codespan-reporting = "0.11.1" expect-test.workspace = true lalrpop-util = { version = "0.20.0", features = ["lexer"] } libp2p = { workspace = true, features = ["ping"] } -libp2p-swarm-test = "0.3.0" +libp2p-swarm-test = { workspace = true } +mockall.workspace = true pin-project = "1.1.3" pretty = "0.12.1" quickcheck = "1.0.3" @@ -45,5 +48,12 @@ tokio-stream.workspace = true tokio-util.workspace = true tracing-subscriber.workspace = true +#[target.'cfg(loomtest)'.dependencies] +loom.workspace = true + [build-dependencies] lalrpop = "0.20.0" + +[features] +default = [] +test-utils = ["mockall", "libp2p-swarm-test"] diff --git a/recon/src/lib.rs b/recon/src/lib.rs index e2ccf7a87..1864fc7e4 100644 --- a/recon/src/lib.rs +++ b/recon/src/lib.rs @@ -1,14 +1,13 @@ //! Recon is a network protocol for set reconciliation -#![warn(missing_docs, missing_debug_implementations, clippy::all)] +#![warn(missing_docs, clippy::all)] pub use crate::{ client::{Client, Server}, error::Error, metrics::Metrics, recon::{ - btreestore::BTreeStore, AssociativeHash, EventIdStore, FullInterests, HashCount, - InsertResult, InterestProvider, InterestStore, Key, Range, Recon, ReconInterestProvider, - ReconItem, Store, SyncState, + AssociativeHash, EventIdStore, FullInterests, HashCount, InsertResult, InterestProvider, + InterestStore, Key, Range, Recon, ReconInterestProvider, ReconItem, Store, SyncState, }, sha256a::Sha256a, }; @@ -21,8 +20,9 @@ pub mod protocol; mod recon; mod sha256a; -#[cfg(test)] -mod tests; +/// Testing utilities related to recon +#[cfg(any(test, feature = "test-utils"))] +pub mod test_utils; /// A result type that wraps a recon Error pub type Result = std::result::Result; diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index 2e6cb57d0..0f22c5a1a 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -18,6 +18,7 @@ mod tests; mod upgrade; use ceramic_core::{EventId, Interest}; +pub(crate) use handler::Handler; use libp2p::{ core::ConnectedPoint, swarm::{ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm}, @@ -28,17 +29,22 @@ use std::{ task::Poll, time::{Duration, Instant}, }; +use std::collections::VecDeque; +use std::task::Waker; use tracing::{debug, trace, warn}; pub use crate::protocol::Recon; use crate::{ libp2p::{ - handler::{FromBehaviour, FromHandler, Handler}, + handler::{FromBehaviour, FromHandler}, stream_set::StreamSet, }, Sha256a, }; +/// Events sent to swarm +pub type ToSwarmEvent = ToSwarm; + /// Name of the Recon protocol for synchronizing interests pub const PROTOCOL_NAME_INTEREST: &str = "/ceramic/recon/0.1.0/interest"; /// Name of the Recon protocol for synchronizing models @@ -71,8 +77,8 @@ pub struct Behaviour { model: M, config: Config, peers: BTreeMap, - swarm_events_sender: tokio::sync::mpsc::Sender>, - swarm_events_receiver: tokio::sync::mpsc::Receiver>, + pending_events_waker: Option, + pending_events: VecDeque, } /// Information about a remote peer and its sync status. @@ -119,23 +125,21 @@ impl Behaviour { I: Recon, M: Recon, { - let (tx, rx) = tokio::sync::mpsc::channel(1000); Self { interest, model, config, peers: BTreeMap::new(), - swarm_events_sender: tx, - swarm_events_receiver: rx, + pending_events_waker: None, + pending_events: VecDeque::default(), } } - fn send_event(&self, event: ToSwarm) { - let tx = self.swarm_events_sender.clone(); - let _ = tokio::task::block_in_place(move || { - let handle = tokio::runtime::Handle::current(); - handle.block_on(async move { tx.send(event).await }) - }); + fn send_event(&mut self, event: ToSwarmEvent) { + if let Some(waker) = &self.pending_events_waker { + waker.wake_by_ref(); + } + self.pending_events.push_back(event); } } @@ -251,9 +255,15 @@ where &mut self, cx: &mut std::task::Context<'_>, ) -> Poll>> { - if let Poll::Ready(Some(event)) = self.swarm_events_receiver.poll_recv(cx) { - debug!(?event, "swarm event"); - return Poll::Ready(event); + match self.pending_events.pop_front() { + Some(event) => { + self.pending_events_waker = None; + debug!(?event, "sending event to swarm"); + return Poll::Ready(event); + } + None => { + self.pending_events_waker = Some(cx.waker().clone()); + } } // Check each peer and start synchronization as needed. for (peer_id, info) in &mut self.peers { diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 9a818f106..f43dd3e19 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -1,238 +1,38 @@ +use libp2p::PeerId; +use tracing::info; + use crate::{ - libp2p::{stream_set::StreamSet, PeerEvent, PeerStatus}, - AssociativeHash, BTreeStore, Error, FullInterests, HashCount, InsertResult, InterestProvider, - Key, Metrics, Recon, ReconItem, Result as ReconResult, Server, Store, + test_utils::{MockReconForEventId, MockReconForInterest, MockingType, PEER_ID}, + Error, }; - -use async_trait::async_trait; -use ceramic_core::RangeOpen; -use ceramic_metrics::init_local_tracing; -use libp2p::{metrics::Registry, PeerId, Swarm}; +use futures::future::poll_immediate; +use libp2p::core::{ConnectedPoint, Endpoint}; +use libp2p::swarm::ConnectionId; +use libp2p::swarm::{behaviour::ConnectionEstablished, NetworkBehaviour}; +use libp2p::Multiaddr; use libp2p_swarm_test::SwarmExt; -use tracing::info; - -fn start_recon(recon: Recon) -> crate::Client -where - K: Key, - H: AssociativeHash, - S: Store + Send + Sync + 'static, - I: InterestProvider + Send + Sync + 'static, -{ - let mut server = Server::new(recon); - let client = server.client(); - tokio::spawn(server.run()); - client -} - -/// An implementation of a Store that stores keys in an in-memory BTree and throws errors if desired. -#[derive(Debug)] -pub struct BTreeStoreErrors { - error: Option, - inner: BTreeStore, -} - -impl BTreeStoreErrors { - fn set_error(&mut self, error: Error) { - self.error = Some(error); - } - - fn as_error(&self) -> Result<(), Error> { - if let Some(err) = &self.error { - match err { - Error::Application { error } => Err(Error::Application { - error: anyhow::anyhow!(error.to_string()), - }), - Error::Fatal { error } => Err(Error::Fatal { - error: anyhow::anyhow!(error.to_string()), - }), - Error::Transient { error } => Err(Error::Transient { - error: anyhow::anyhow!(error.to_string()), - }), - } - } else { - Ok(()) - } - } -} - -impl Default for BTreeStoreErrors { - /// By default no errors are thrown. Use set_error to change this. - fn default() -> Self { - Self { - error: None, - inner: BTreeStore::default(), - } - } -} - -#[async_trait] -impl crate::recon::Store for BTreeStoreErrors -where - K: Key, - H: AssociativeHash, -{ - type Key = K; - type Hash = H; - - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { - self.as_error()?; - - self.inner.insert(item).await - } - - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { - self.as_error()?; - - self.inner.insert_many(items).await - } - - async fn hash_range( - &self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> ReconResult> { - self.as_error()?; - - self.inner.hash_range(left_fencepost, right_fencepost).await - } - - async fn range( - &self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - offset: usize, - limit: usize, - ) -> ReconResult + Send + 'static>> { - self.as_error()?; - - self.inner - .range(left_fencepost, right_fencepost, offset, limit) - .await - } - async fn range_with_values( - &self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - offset: usize, - limit: usize, - ) -> ReconResult)> + Send + 'static>> { - self.as_error()?; - - self.inner - .range_with_values(left_fencepost, right_fencepost, offset, limit) - .await - } - - async fn last( - &self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> ReconResult> { - self.as_error()?; - - self.inner.last(left_fencepost, right_fencepost).await - } - - async fn first_and_last( - &self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> ReconResult> { - self.as_error()?; - - self.inner - .first_and_last(left_fencepost, right_fencepost) - .await - } +use std::future::poll_fn; +use std::pin::pin; - async fn value_for_key(&self, key: &Self::Key) -> ReconResult>> { - self.as_error()?; - - self.inner.value_for_key(key).await - } - async fn keys_with_missing_values( - &self, - range: RangeOpen, - ) -> ReconResult> { - self.as_error()?; - - self.inner.keys_with_missing_values(range).await - } -} - -// use a hackro to avoid setting all the generic types we'd need if using functions -macro_rules! setup_test { - ($alice_store: expr, $alice_interests: expr, $bob_store: expr, $bob_interest: expr,) => {{ - let _ = init_local_tracing(); - - let alice = Recon::new( - $alice_store, - FullInterests::default(), - Metrics::register(&mut Registry::default()), - ); - let alice_client = start_recon(alice); - - let alice_interests = Recon::new( - $alice_interests, - FullInterests::default(), - Metrics::register(&mut Registry::default()), - ); - - let alice_interests_client = start_recon(alice_interests); - - let bob_interest = Recon::new( - $bob_interest, - FullInterests::default(), - Metrics::register(&mut Registry::default()), - ); - let bob_interest_client = start_recon(bob_interest); - - let bob = Recon::new( - $bob_store, - FullInterests::default(), - Metrics::register(&mut Registry::default()), - ); - let bob_client = start_recon(bob); - - let swarm1 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new( - alice_interests_client, - alice_client, - crate::libp2p::Config::default(), - ) - }); - let swarm2 = Swarm::new_ephemeral(|_| { - crate::libp2p::Behaviour::new( - bob_interest_client, - bob_client, - crate::libp2p::Config::default(), - ) - }); - - (swarm1, swarm2) - }}; -} +use crate::libp2p::{stream_set::StreamSet, PeerEvent, PeerStatus}; +use crate::libp2p::{Behaviour, Config}; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn in_sync_no_overlap() { - let (mut swarm1, mut swarm2) = setup_test!( - BTreeStoreErrors::default(), - BTreeStoreErrors::default(), - BTreeStoreErrors::default(), - BTreeStoreErrors::default(), - ); + let mut a = MockingType::None.into_swarm(); + let mut b = MockingType::None.into_swarm(); let fut = async move { - let p1 = swarm1.local_peer_id().to_owned(); - let p2 = swarm2.local_peer_id().to_owned(); + let p1 = a.swarm.local_peer_id().to_owned(); + let p2 = b.swarm.local_peer_id().to_owned(); - swarm1.listen().with_memory_addr_external().await; - swarm2.connect(&mut swarm1).await; + a.swarm.listen().with_memory_addr_external().await; + b.swarm.connect(&mut a.swarm).await; let ([p1_e1, p1_e2, p1_e3, p1_e4], [p2_e1, p2_e2, p2_e3, p2_e4]): ( [crate::libp2p::Event; 4], [crate::libp2p::Event; 4], - ) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + ) = libp2p_swarm_test::drive(&mut a.swarm, &mut b.swarm).await; assert_in_sync(p2, [p1_e1, p1_e2, p1_e3, p1_e4]); assert_in_sync(p1, [p2_e1, p2_e2, p2_e3, p2_e4]); @@ -243,25 +43,27 @@ async fn in_sync_no_overlap() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn initiator_model_error() { - let mut alice_model_store = BTreeStoreErrors::default(); - alice_model_store.set_error(Error::new_transient(anyhow::anyhow!( - "transient error should be handled" - ))); - let (mut swarm1, mut swarm2) = setup_test!( - alice_model_store, - BTreeStoreErrors::default(), - BTreeStoreErrors::default(), - BTreeStoreErrors::default(), - ); + let mut a = MockingType::Event.into_swarm(); + let mut b = MockingType::None.into_swarm(); + + // fail the model sync + let _ = { + a.event_store + .as_mock() + .lock() + .await + .expect_hash_range() + .returning(|_, _| Err(Error::new_app(anyhow::anyhow!("error")))) + }; let fut = async move { - swarm1.listen().with_memory_addr_external().await; - swarm2.connect(&mut swarm1).await; + a.swarm.listen().with_memory_addr_external().await; + b.swarm.connect(&mut a.swarm).await; let ([p1_e1, p1_e2, p1_e3, failed_peer], [p2_e1, p2_e2, p2_e3]): ( [crate::libp2p::Event; 4], [crate::libp2p::Event; 3], - ) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + ) = libp2p_swarm_test::drive(&mut a.swarm, &mut b.swarm).await; for ev in &[p1_e1, p1_e2, p1_e3, p2_e1, p2_e2, p2_e3] { info!("{:?}", ev); @@ -271,7 +73,7 @@ async fn initiator_model_error() { assert_eq!( failed_peer, crate::libp2p::Event::PeerEvent(PeerEvent { - remote_peer_id: swarm2.local_peer_id().to_owned(), + remote_peer_id: b.swarm.local_peer_id().to_owned(), status: PeerStatus::Failed }) ); @@ -282,37 +84,39 @@ async fn initiator_model_error() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn responder_model_error() { - let mut bob_model_store = BTreeStoreErrors::default(); - bob_model_store.set_error(Error::new_transient(anyhow::anyhow!( - "transient error should be handled" - ))); - let (mut swarm1, mut swarm2) = setup_test!( - BTreeStoreErrors::default(), - BTreeStoreErrors::default(), - bob_model_store, - BTreeStoreErrors::default(), - ); + let mut a = MockingType::None.into_swarm(); + let mut b = MockingType::Event.into_swarm(); + + // fail the model sync + let _ = { + b.event_store + .as_mock() + .lock() + .await + .expect_hash_range() + .returning(|_, _| Err(Error::new_app(anyhow::anyhow!("error")))) + }; let fut = async move { - swarm1.listen().with_memory_addr_external().await; - swarm2.connect(&mut swarm1).await; + a.swarm.listen().with_memory_addr_external().await; + b.swarm.connect(&mut a.swarm).await; let ([p1_e1, p1_e2, p1_e3], [p2_e1, p2_e2, p2_e3]): ( [crate::libp2p::Event; 3], [crate::libp2p::Event; 3], - ) = libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + ) = libp2p_swarm_test::drive(&mut a.swarm, &mut b.swarm).await; for ev in &[p1_e1, p1_e2, p1_e3, p2_e1, p2_e2, p2_e3] { info!("{:?}", ev); } let ([], [failed_peer]): ([crate::libp2p::Event; 0], [crate::libp2p::Event; 1]) = - libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + libp2p_swarm_test::drive(&mut a.swarm, &mut b.swarm).await; info!("{:?}", failed_peer); assert_eq!( failed_peer, crate::libp2p::Event::PeerEvent(PeerEvent { - remote_peer_id: swarm1.local_peer_id().to_owned(), + remote_peer_id: a.swarm.local_peer_id().to_owned(), status: PeerStatus::Failed }) ); @@ -365,3 +169,107 @@ fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 4]) { } ); } + +#[tokio::test(flavor = "multi_thread")] +async fn poll_not_ready_on_connection_established() { + let mut behavior = Behaviour::new( + MockReconForInterest::default(), + MockReconForEventId::default(), + Config::default(), + ); + { + let mut pin = pin!(&mut behavior); + assert!(poll_immediate(poll_fn(|cx| pin.poll(cx))).await.is_none()); + } + let address = Multiaddr::empty(); + let role_override = Endpoint::Dialer; + behavior.on_swarm_event(libp2p::swarm::FromSwarm::ConnectionEstablished( + ConnectionEstablished { + peer_id: PEER_ID.parse().unwrap(), + connection_id: ConnectionId::new_unchecked(0), + endpoint: &ConnectedPoint::Dialer { + address, + role_override, + }, + failed_addresses: &[], + other_established: 0, + }, + )); + let mut pin = pin!(&mut behavior); + assert!(poll_immediate(poll_fn(|cx| pin.poll(cx))).await.is_none()); +} + +// #[cfg(loomtest)] +mod loomtest { + use super::*; + use crate::libp2p::{ConnectionInfo, Event, PeerInfo}; + use libp2p::swarm::ToSwarm; + + #[test] + fn behaviour_poll() { + loom::model(|| { + let mut behavior = Behaviour::new( + MockReconForInterest::new(), + MockReconForEventId::new(), + Config::default(), + ); + let res = loom::future::block_on(poll_immediate(poll_fn(|cx| { + behavior.poll(cx) + }))); + assert!(res.is_none()) + }); + } + + #[test] + fn behaviour_poll_with_peer() { + loom::model(|| { + let mut behavior = Behaviour::new( + MockReconForInterest::default(), + MockReconForEventId::default(), + Config::default(), + ); + behavior.peers.insert( + PEER_ID.parse().unwrap(), + PeerInfo { + status: PeerStatus::Waiting, + connections: vec![ConnectionInfo { + id: ConnectionId::new_unchecked(0), + dialer: true, + }], + last_sync: None, + }, + ); + let res = loom::future::block_on(poll_immediate(poll_fn(|cx| { + behavior.poll(cx) + }))); + assert!(res.is_none()); + }); + } + + #[test] + fn behaviour_send() { + loom::model(|| { + let mut behavior = Behaviour::new( + MockReconForInterest::default(), + MockReconForEventId::default(), + Config::default(), + ); + let mut behavior = std::pin::Pin::new(&mut behavior); + let res = loom::future::block_on(poll_immediate(poll_fn(|cx| { + behavior.poll(cx) + }))); + assert!(res.is_none()); + + behavior.send_event(ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent { + remote_peer_id: PEER_ID.parse().unwrap(), + status: PeerStatus::Waiting, + }))); + + let res = loom::future::block_on(poll_immediate(poll_fn(|cx| { + behavior.poll(cx) + }))); + + assert!(res.is_some()); + }); + } +} diff --git a/recon/src/protocol.rs b/recon/src/protocol.rs index a3f99839f..cc766e4af 100644 --- a/recon/src/protocol.rs +++ b/recon/src/protocol.rs @@ -1070,7 +1070,8 @@ where mod tests { use expect_test::expect; - use crate::{tests::AlphaNumBytes, Sha256a}; + use crate::test_utils::AlphaNumBytes; + use crate::Sha256a; use super::*; diff --git a/recon/src/recon.rs b/recon/src/recon.rs index 98771562a..60f2510d9 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -1,4 +1,3 @@ -pub mod btreestore; #[cfg(test)] pub mod tests; @@ -310,8 +309,8 @@ where /// A hash with a count of how many values produced the hash. #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct HashCount { - hash: H, - count: u64, + pub(crate) hash: H, + pub(crate) count: u64, } impl HashCount { @@ -435,12 +434,12 @@ pub trait Store { /// Insert a new key into the key space. Returns true if the key did not exist. /// The value will be updated if included - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> Result; + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> Result; /// Insert new keys into the key space. /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> Result; + async fn insert_many<'a>(&self, items: &[ReconItem<'a, Self::Key>]) -> Result; /// Return the hash of all keys in the range between left_fencepost and right_fencepost. /// Both range bounds are exclusive. @@ -592,11 +591,11 @@ where type Key = K; type Hash = H; - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> Result { + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> Result { self.as_ref().insert(item).await } - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> Result { + async fn insert_many<'a>(&self, items: &[ReconItem<'a, Self::Key>]) -> Result { self.as_ref().insert_many(items).await } diff --git a/recon/src/recon/tests.rs b/recon/src/recon/tests.rs index 5ef5acaaa..5a761dbd4 100644 --- a/recon/src/recon/tests.rs +++ b/recon/src/recon/tests.rs @@ -14,7 +14,6 @@ lalrpop_util::lalrpop_mod!( pub parser, "/recon/parser.rs" ); // synthesized by LALRPOP -use anyhow::Result; use async_trait::async_trait; use ceramic_core::RangeOpen; use futures::{ready, Future, Sink, Stream}; @@ -38,12 +37,11 @@ use expect_test::{expect, expect_file, Expect}; use lalrpop_util::ParseError; use pretty::{Arena, DocAllocator, DocBuilder, Pretty}; +use crate::test_utils::{AlphaNumBytes, BTreeStore}; use crate::{ protocol::{self, InitiatorMessage, ReconMessage, ResponderMessage, ValueResponse}, recon::{FullInterests, HashCount, InterestProvider, Range, ReconItem}, - tests::AlphaNumBytes, - AssociativeHash, BTreeStore, Client, Key, Metrics, Recon, Result as ReconResult, Server, - Sha256a, Store, + AssociativeHash, Client, Key, Metrics, Recon, Result as ReconResult, Server, Sha256a, Store, }; #[derive(Clone, Default, PartialEq, Serialize, Deserialize)] @@ -1207,7 +1205,7 @@ where async fn recon_do(recon: &str) -> Sequence { async fn snapshot_state( client: Client, - ) -> Result>> { + ) -> ReconResult>> { let mut state = BTreeMap::new(); let keys: Vec = client.full_range().await?.collect(); for key in keys { diff --git a/recon/src/sha256a.rs b/recon/src/sha256a.rs index b9995bfbc..bbcbc0fbd 100644 --- a/recon/src/sha256a.rs +++ b/recon/src/sha256a.rs @@ -199,7 +199,7 @@ impl Debug for Sha256a { mod tests { use super::*; use crate::recon::AssociativeHash; - use crate::tests::AlphaNumBytes; + use crate::test_utils::AlphaNumBytes; use expect_test::expect; diff --git a/recon/src/recon/btreestore.rs b/recon/src/test_utils/btreestore.rs similarity index 96% rename from recon/src/recon/btreestore.rs rename to recon/src/test_utils/btreestore.rs index c2f6e96d3..d08ba1428 100644 --- a/recon/src/recon/btreestore.rs +++ b/recon/src/test_utils/btreestore.rs @@ -4,8 +4,8 @@ use std::{collections::BTreeMap, ops::Bound, sync::Arc}; use tokio::sync::Mutex; use crate::{ - recon::{AssociativeHash, Key, MaybeHashedKey, ReconItem, Store}, - HashCount, InsertResult, Result, + recon::{AssociativeHash, HashCount, InsertResult, Key, MaybeHashedKey, ReconItem, Store}, + Result, }; #[derive(Clone, Debug)] @@ -152,7 +152,7 @@ where type Key = K; type Hash = H; - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> Result { + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> Result { let mut inner = self.inner.lock().await; let new = inner .keys @@ -165,7 +165,7 @@ where Ok(new) } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> Result { + async fn insert_many<'a>(&self, items: &[ReconItem<'a, K>]) -> Result { let mut new = vec![false; items.len()]; let mut new_val_cnt = 0; for (idx, item) in items.iter().enumerate() { diff --git a/recon/src/test_utils/mock_or_real.rs b/recon/src/test_utils/mock_or_real.rs new file mode 100644 index 000000000..70e75563b --- /dev/null +++ b/recon/src/test_utils/mock_or_real.rs @@ -0,0 +1,279 @@ +use crate::test_utils::BTreeStore; +use crate::{Key, Metrics, Range, Result, Sha256a, Store, SyncState}; +use ceramic_core::RangeOpen; +use prometheus_client::registry::Registry; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::Mutex; + +#[derive(Clone)] +pub enum MockOrRealStore +where + K: Key + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>, + S: Store + Send + Sync, +{ + Mock(Arc>), + Real(BTreeStore), +} + +#[async_trait::async_trait] +impl crate::protocol::Recon for Arc> +where + K: Key + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>, + R: crate::protocol::Recon, +{ + type Key = K; + type Hash = Sha256a; + + async fn insert(&self, key: Self::Key, value: Option>) -> Result<()> { + self.lock().await.insert(key, value).await + } + + async fn range( + &self, + left_fencepost: Self::Key, + right_fencepost: Self::Key, + offset: usize, + limit: usize, + ) -> Result> { + self.lock() + .await + .range(left_fencepost, right_fencepost, offset, limit) + .await + } + + async fn len(&self) -> Result { + self.lock().await.len().await + } + + async fn value_for_key(&self, key: Self::Key) -> Result>> { + self.lock().await.value_for_key(key).await + } + + async fn keys_with_missing_values( + &self, + range: RangeOpen, + ) -> Result> { + self.lock().await.keys_with_missing_values(range).await + } + + async fn interests(&self) -> Result>> { + self.lock().await.interests().await + } + + async fn process_interests( + &self, + interests: Vec>, + ) -> Result>> { + self.lock().await.process_interests(interests).await + } + + async fn initial_range( + &self, + interest: RangeOpen, + ) -> Result> { + self.lock().await.initial_range(interest).await + } + + async fn process_range( + &self, + range: Range, + ) -> Result<(SyncState, Vec)> { + self.lock().await.process_range(range).await + } + + fn metrics(&self) -> Metrics { + Metrics::register(&mut Registry::default()) + } +} + +#[async_trait::async_trait] +impl Store for Arc> +where + K: Key + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>, + S: Store + Send + Sync, +{ + type Key = K; + type Hash = Sha256a; + + async fn insert<'a>(&self, item: &crate::ReconItem<'a, Self::Key>) -> Result { + self.lock().await.insert(item).await + } + + async fn insert_many<'a>( + &self, + items: &[crate::ReconItem<'a, Self::Key>], + ) -> Result { + self.lock().await.insert_many(items).await + } + + async fn hash_range( + &self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + self.lock() + .await + .hash_range(left_fencepost, right_fencepost) + .await + } + + async fn range( + &self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + offset: usize, + limit: usize, + ) -> Result + Send + 'static>> { + self.lock() + .await + .range(left_fencepost, right_fencepost, offset, limit) + .await + } + + async fn range_with_values( + &self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + offset: usize, + limit: usize, + ) -> Result)> + Send + 'static>> { + self.lock() + .await + .range_with_values(left_fencepost, right_fencepost, offset, limit) + .await + } + + async fn value_for_key(&self, key: &Self::Key) -> Result>> { + self.lock().await.value_for_key(key).await + } + + /// Report all keys in the range that are missing a value. + async fn keys_with_missing_values( + &self, + range: RangeOpen, + ) -> Result> { + self.lock().await.keys_with_missing_values(range).await + } +} + +impl MockOrRealStore +where + K: Key + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>, + S: Store + Send + Sync, +{ + pub fn as_mock(&self) -> Arc> { + match self { + Self::Mock(store) => store.clone(), + Self::Real(_) => panic!("Not a mocked store"), + } + } +} + +#[async_trait::async_trait] +impl Store for MockOrRealStore +where + K: Key + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>, + S: Store + Send + Sync, +{ + type Key = K; + type Hash = Sha256a; + + async fn insert<'a>(&self, item: &crate::ReconItem<'a, Self::Key>) -> Result { + match self { + Self::Mock(store) => store.lock().await.insert(item).await, + Self::Real(store) => store.insert(item).await, + } + } + + async fn insert_many<'a>( + &self, + items: &[crate::ReconItem<'a, Self::Key>], + ) -> Result { + match self { + Self::Mock(store) => store.lock().await.insert_many(items).await, + Self::Real(store) => store.insert_many(items).await, + } + } + + async fn hash_range( + &self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + match self { + Self::Mock(store) => { + store + .lock() + .await + .hash_range(left_fencepost, right_fencepost) + .await + } + Self::Real(store) => store.hash_range(left_fencepost, right_fencepost).await, + } + } + + async fn range( + &self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + offset: usize, + limit: usize, + ) -> Result + Send + 'static>> { + match self { + Self::Mock(store) => { + store + .lock() + .await + .range(left_fencepost, right_fencepost, offset, limit) + .await + } + Self::Real(store) => { + store + .range(left_fencepost, right_fencepost, offset, limit) + .await + } + } + } + + async fn range_with_values( + &self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + offset: usize, + limit: usize, + ) -> Result)> + Send + 'static>> { + match self { + Self::Mock(store) => { + store + .lock() + .await + .range_with_values(left_fencepost, right_fencepost, offset, limit) + .await + } + Self::Real(store) => { + store + .range_with_values(left_fencepost, right_fencepost, offset, limit) + .await + } + } + } + + async fn value_for_key(&self, key: &Self::Key) -> Result>> { + match self { + Self::Mock(store) => store.lock().await.value_for_key(key).await, + Self::Real(store) => store.value_for_key(key).await, + } + } + + /// Report all keys in the range that are missing a value. + async fn keys_with_missing_values( + &self, + range: RangeOpen, + ) -> Result> { + match self { + Self::Mock(store) => store.lock().await.keys_with_missing_values(range).await, + Self::Real(store) => store.keys_with_missing_values(range).await, + } + } +} diff --git a/recon/src/test_utils/mocks.rs b/recon/src/test_utils/mocks.rs new file mode 100644 index 000000000..39d3493c7 --- /dev/null +++ b/recon/src/test_utils/mocks.rs @@ -0,0 +1,278 @@ +//! Testing utilities for recon +use crate::{ + protocol::Recon as ProtocolRecon, HashCount, InsertResult, + InterestProvider as ReconInterestProvider, Metrics, ReconItem, Result, Sha256a, Store, + SyncState, +}; +use ceramic_core::{EventId, Interest, RangeOpen}; +use mockall::mock; + +type MockSyncState = SyncState<::Key, Sha256a>; + +mock! { + + pub EventIdRecon {} + + impl Clone for EventIdRecon { + fn clone(&self) -> Self; + } + + #[async_trait::async_trait] + impl ProtocolRecon for EventIdRecon { + type Key = EventId; + type Hash = Sha256a; + /// Insert a new key into the key space. + async fn insert(&self, key: ::Key, value: Option>) -> Result<()>; + + /// Get all keys in the specified range + async fn range( + &self, + left_fencepost: ::Key, + right_fencepost: ::Key, + offset: usize, + limit: usize, + ) -> Result::Key>>; + + /// Reports total number of keys + async fn len(&self) -> Result; + + /// Reports if the set is empty. + async fn is_empty(&self) -> Result { + Ok(self.len().await? == 0) + } + + /// Retrieve a value associated with a recon key + async fn value_for_key(&self, key: ::Key) -> Result>>; + + /// Report all keys in the range that are missing a value + async fn keys_with_missing_values(&self, range: RangeOpen<::Key>) + -> Result::Key>>; + + /// Reports the interests of this recon instance + async fn interests(&self) -> Result::Key>>>; + + /// Computes the intersection of input interests with the local interests + async fn process_interests( + &self, + interests: Vec::Key>>, + ) -> Result::Key>>>; + + /// Compute an initial hash for the range + async fn initial_range( + &self, + interest: RangeOpen<::Key>, + ) -> Result::Key, Sha256a>>; + + /// Computes a response to a remote range + async fn process_range( + &self, + range: crate::Range<::Key, Sha256a>, + ) -> Result<(MockSyncState, Vec<::Key>)>; + + /// Create a handle to the metrics + fn metrics(&self) -> Metrics; + } +} + +/// Expose mock for recon with event id +pub type MockReconForEventId = MockEventIdRecon; + +mock! { + + pub InterestRecon {} + + impl Clone for InterestRecon { + fn clone(&self) -> Self; + } + + #[async_trait::async_trait] + impl ProtocolRecon for InterestRecon { + type Key = Interest; + type Hash = Sha256a; + /// Insert a new key into the key space. + async fn insert(&self, key: ::Key, value: Option>) -> Result<()>; + + /// Get all keys in the specified range + async fn range( + &self, + left_fencepost: ::Key, + right_fencepost: ::Key, + offset: usize, + limit: usize, + ) -> Result::Key>>; + + /// Reports total number of keys + async fn len(&self) -> Result; + + /// Reports if the set is empty. + async fn is_empty(&self) -> Result { + Ok(self.len().await? == 0) + } + + /// Retrieve a value associated with a recon key + async fn value_for_key(&self, key: ::Key) -> Result>>; + + /// Report all keys in the range that are missing a value + async fn keys_with_missing_values(&self, range: RangeOpen<::Key>) + -> Result::Key>>; + + /// Reports the interests of this recon instance + async fn interests(&self) -> Result::Key>>>; + + /// Computes the intersection of input interests with the local interests + async fn process_interests( + &self, + interests: Vec::Key>>, + ) -> Result::Key>>>; + + /// Compute an initial hash for the range + async fn initial_range( + &self, + interest: RangeOpen<::Key>, + ) -> Result::Key, Sha256a>>; + + /// Computes a response to a remote range + async fn process_range( + &self, + range: crate::Range<::Key, Sha256a>, + ) -> Result<(MockSyncState, Vec<::Key>)>; + + /// Create a handle to the metrics + fn metrics(&self) -> Metrics; + } +} + +/// Expose mock for recon with interest +pub type MockReconForInterest = MockInterestRecon; + +mock! { + pub InterestInterestProvider {} + + impl Clone for InterestInterestProvider { + fn clone(&self) -> Self; + } + + #[async_trait::async_trait] + impl ReconInterestProvider for InterestInterestProvider { + type Key = Interest; + + async fn interests(&self) -> Result>>; + } +} + +/// Expose mock for interest provider with interest +pub type MockInterestProviderForInterest = MockInterestInterestProvider; + +mock! { + pub EventIdInterestProvider {} + + impl Clone for EventIdInterestProvider { + fn clone(&self) -> Self; + } + + #[async_trait::async_trait] + impl ReconInterestProvider for EventIdInterestProvider { + type Key = EventId; + + async fn interests(&self) -> Result>>; + } +} + +/// Expose mock for interest provider with event id +pub type MockInterestProviderForEventId = MockEventIdInterestProvider; + +mock! { + pub EventIdStore {} + + impl Clone for EventIdStore { + fn clone(&self) -> Self; + } + + #[async_trait::async_trait] + impl Store for EventIdStore { + type Key = EventId; + type Hash = Sha256a; + + async fn insert<'a>(&self, item: &ReconItem<'a, EventId>) -> Result; + + async fn insert_many<'a>(&self, items: &[ReconItem<'a, EventId>]) -> Result; + + async fn hash_range( + &self, + left_fencepost: &EventId, + right_fencepost: &EventId, + ) -> Result>; + + async fn range( + &self, + left_fencepost: &EventId, + right_fencepost: &EventId, + offset: usize, + limit: usize, + ) -> Result + Send + 'static>>; + + async fn range_with_values( + &self, + left_fencepost: &EventId, + right_fencepost: &EventId, + offset: usize, + limit: usize, + ) -> Result)> + Send + 'static>>; + + async fn value_for_key(&self, key: &EventId) -> Result>>; + + async fn keys_with_missing_values(&self, range: RangeOpen) + -> Result>; + } +} + +/// Expose mock for store with event id +pub type MockStoreForEventId = MockEventIdStore; + +mock! { + pub InterestStore {} + + impl Clone for InterestStore { + fn clone(&self) -> Self; + } + + #[async_trait::async_trait] + impl Store for InterestStore { + type Key = Interest; + type Hash = Sha256a; + + async fn insert<'a>(&self, item: &ReconItem<'a, Interest>) -> Result; + + async fn insert_many<'a>(&self, items: &[ReconItem<'a, Interest>]) -> Result; + + async fn hash_range( + &self, + left_fencepost: &Interest, + right_fencepost: &Interest, + ) -> Result>; + + async fn range( + &self, + left_fencepost: &Interest, + right_fencepost: &Interest, + offset: usize, + limit: usize, + ) -> Result + Send + 'static>>; + + async fn range_with_values( + &self, + left_fencepost: &Interest, + right_fencepost: &Interest, + offset: usize, + limit: usize, + ) -> Result)> + Send + 'static>>; + + async fn value_for_key(&self, key: &Interest) -> Result>>; + + async fn keys_with_missing_values(&self, range: RangeOpen) + -> Result>; + } +} + +/// Expose mock for store with event id +pub type MockStoreForInterest = MockInterestStore; diff --git a/recon/src/tests.rs b/recon/src/test_utils/mod.rs similarity index 79% rename from recon/src/tests.rs rename to recon/src/test_utils/mod.rs index d09b6cf77..2f2089e32 100644 --- a/recon/src/tests.rs +++ b/recon/src/test_utils/mod.rs @@ -1,16 +1,33 @@ +mod btreestore; +mod mock_or_real; +mod mocks; +mod test_behaviour; +mod test_swarm; + +pub use btreestore::BTreeStore; +pub use mocks::{ + MockInterestProviderForEventId, MockInterestProviderForInterest, MockReconForEventId, + MockReconForInterest, MockStoreForEventId, MockStoreForInterest, +}; +pub use test_behaviour::{InjectedEvent, TestBehaviour}; +pub use test_swarm::{MockingType, TestSwarm}; + +use crate::libp2p::{Event, PeerEvent, PeerStatus}; +use crate::Key; +use anyhow::Result; +use libp2p::swarm::ToSwarm; use serde::de::Error; use serde::{ de::{self, Visitor}, Deserialize, Serialize, }; -use crate::Key; - /// Sequence of byte values including only ASCII alpha numeric values. #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Default)] pub struct AlphaNumBytes(Vec); impl AlphaNumBytes { + /// convert to bytes pub fn into_inner(self) -> Vec { self.0 } @@ -133,3 +150,14 @@ impl Key for AlphaNumBytes { self == &Self::min_value() || self == &Self::max_value() } } + +/// Peer ID for testing +pub const PEER_ID: &str = "12D3KooWJqb7KjjcWSC92xcSHhdGUrnJ5FJiTHHdZEW7QaLWG5X3"; + +/// Generate a peer event +pub fn peer_event() -> crate::libp2p::ToSwarmEvent { + ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent { + remote_peer_id: PEER_ID.parse().unwrap(), + status: PeerStatus::Waiting, + })) +} diff --git a/recon/src/test_utils/test_behaviour.rs b/recon/src/test_utils/test_behaviour.rs new file mode 100644 index 000000000..d48d03a7b --- /dev/null +++ b/recon/src/test_utils/test_behaviour.rs @@ -0,0 +1,325 @@ +use crate::libp2p::Event; +use libp2p::core::{ConnectedPoint, Endpoint}; +use libp2p::swarm::handler::ConnectionEvent; +use libp2p::swarm::{ + ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId, FromSwarm, + NetworkBehaviour, SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, +}; +use libp2p::Multiaddr; +use libp2p_identity::PeerId; +use std::collections::HashMap; +use std::task::{Context, Poll}; + +type InEvent = ToSwarm<::ToSwarm, THandlerInEvent>; +type Convert = Box< + dyn Fn(&mut N, InEvent) -> Option>>> + + Send, +>; +type Api = Box; + +/// Wrapper a behaviour for testing +pub struct TestBehaviour +where + N: NetworkBehaviour, +{ + /// Inner behaviour to be wrapped for testing + pub inner: N, + /// How to convert inner behaviour events to swarm events + pub convert: Convert, + /// How to handle api events + pub api: Api, +} + +/// Allow events to be injected into a swarm +pub enum InjectedEvent { + /// Inject a request to perform an API call + Api(String), + /// Inject a behaviour event + BehaviourEvent(Event), + /// Inject an inbound connection + InboundConnection(PeerId, ConnectionId, ConnectedPoint), + /// Inject an outbound connection + OutboundConnection(PeerId, ConnectionId, ConnectedPoint), + /// Inject a connection closed event + ConnectionClosed(PeerId, ConnectionId, ConnectedPoint), +} + +/// Wrapper to check connection behaviour polling +pub struct WrapConnectionHandler { + pub(crate) handler: C, + pub(crate) polled: usize, +} + +impl ConnectionHandler for WrapConnectionHandler +where + C: ConnectionHandler, +{ + type FromBehaviour = C::FromBehaviour; + type ToBehaviour = C::ToBehaviour; + type InboundOpenInfo = C::InboundOpenInfo; + type OutboundOpenInfo = C::OutboundOpenInfo; + type InboundProtocol = C::InboundProtocol; + type OutboundProtocol = C::OutboundProtocol; + + fn listen_protocol(&self) -> SubstreamProtocol { + self.handler.listen_protocol() + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + self.polled += 1; + self.handler.poll(cx) + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + self.handler.on_behaviour_event(event) + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + self.handler.on_connection_event(event) + } +} + +/// Stats associated with injectors +pub struct InjectorStats { + pub polled: usize, + pub handler_polled: HashMap, +} + +/// Wrapper to allow event injection for a swarm +pub struct InjectingBehavior { + pub(crate) events: tokio::sync::mpsc::Receiver, + pub(crate) behaviour: TestBehaviour, + polled: usize, + established_inbound: + HashMap::ConnectionHandler>>, + established_outbound: + HashMap::ConnectionHandler>>, +} + +impl InjectingBehavior { + /// Create a new swarm injector + pub fn new( + events: tokio::sync::mpsc::Receiver, + behaviour: TestBehaviour, + ) -> Self { + InjectingBehavior { + events, + behaviour, + polled: 0, + established_inbound: HashMap::new(), + established_outbound: HashMap::new(), + } + } + + /// Get the number of established connections + pub fn established(&self) -> usize { + self.established_inbound.len() + self.established_outbound.len() + } + + /// Get stats associated with the injector + pub fn stats(&self) -> InjectorStats { + let mut handler_polled = HashMap::new(); + for (id, handler) in &self.established_inbound { + handler_polled.insert(*id, handler.polled); + } + for (id, handler) in &self.established_outbound { + handler_polled.insert(*id, handler.polled); + } + InjectorStats { + polled: self.polled, + handler_polled, + } + } +} + +impl NetworkBehaviour for InjectingBehavior +where + N: NetworkBehaviour, +{ + type ConnectionHandler = ::ConnectionHandler; + type ToSwarm = Event; + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.behaviour.inner.handle_pending_inbound_connection( + connection_id, + local_addr, + remote_addr, + ) + } + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.behaviour.inner.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[Multiaddr], + effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + self.behaviour.inner.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + self.behaviour.inner.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + ) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + self.behaviour.inner.on_swarm_event(event) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + self.behaviour + .inner + .on_connection_handler_event(peer_id, connection_id, event) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.polled += 1; + loop { + if let Poll::Ready(Some(ev)) = self.events.poll_recv(cx) { + match ev { + InjectedEvent::Api(desc) => { + (self.behaviour.api)(&mut self.behaviour.inner, &desc); + } + InjectedEvent::BehaviourEvent(ev) => { + return Poll::Ready(ToSwarm::GenerateEvent(ev)); + } + InjectedEvent::InboundConnection(peer_id, connection_id, endpoint) => { + if let Ok(handler) = + self.behaviour.inner.handle_established_inbound_connection( + connection_id, + peer_id, + &Multiaddr::empty(), + &Multiaddr::empty(), + ) + { + self.established_inbound.insert( + connection_id, + WrapConnectionHandler { handler, polled: 0 }, + ); + self.on_swarm_event(FromSwarm::ConnectionEstablished( + libp2p::swarm::behaviour::ConnectionEstablished { + peer_id, + connection_id, + endpoint: &endpoint, + failed_addresses: &[], + other_established: self.established(), + }, + )); + } + } + InjectedEvent::OutboundConnection(peer_id, connection_id, endpoint) => { + if let Ok(handler) = + self.behaviour.inner.handle_established_outbound_connection( + connection_id, + peer_id, + &Multiaddr::empty(), + endpoint.to_endpoint(), + ) + { + self.established_outbound.insert( + connection_id, + WrapConnectionHandler { handler, polled: 0 }, + ); + self.on_swarm_event(FromSwarm::ConnectionEstablished( + libp2p::swarm::behaviour::ConnectionEstablished { + peer_id, + connection_id, + endpoint: &endpoint, + failed_addresses: &[], + other_established: self.established(), + }, + )); + } + } + InjectedEvent::ConnectionClosed(peer_id, connection_id, endpoint) => { + self.on_swarm_event(FromSwarm::ConnectionClosed( + libp2p::swarm::behaviour::ConnectionClosed { + peer_id, + connection_id, + endpoint: &endpoint, + remaining_established: self.established(), + }, + )); + } + } + } else { + match self.behaviour.inner.poll(cx) { + Poll::Ready(e) => { + if let Some(e) = (self.behaviour.convert)(&mut self.behaviour.inner, e) { + return Poll::Ready(e); + } + } + Poll::Pending => return Poll::Pending, + } + } + } + } +} + +#[derive(NetworkBehaviour)] +#[behaviour(to_swarm = "Event")] +pub struct TestNetworkBehaviour { + pub(crate) inner: InjectingBehavior, +} + +impl TestNetworkBehaviour { + pub fn stats(&self) -> InjectorStats { + self.inner.stats() + } +} diff --git a/recon/src/test_utils/test_swarm.rs b/recon/src/test_utils/test_swarm.rs new file mode 100644 index 000000000..9701aa493 --- /dev/null +++ b/recon/src/test_utils/test_swarm.rs @@ -0,0 +1,197 @@ +use crate::libp2p::Behaviour; +use crate::test_utils::mock_or_real::MockOrRealStore; +use crate::test_utils::test_behaviour::{ + InjectedEvent, InjectingBehavior, InjectorStats, TestBehaviour, TestNetworkBehaviour, +}; +use crate::test_utils::{BTreeStore, MockStoreForEventId, MockStoreForInterest}; +use crate::{Client, FullInterests, InterestProvider, Key, Metrics, Sha256a, Store}; +use ceramic_core::{EventId, Interest}; +use futures::{stream::StreamExt, FutureExt}; +use libp2p::swarm::NetworkBehaviour; +use libp2p_swarm_test::SwarmExt; +use prometheus_client::registry::Registry; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// How to mock stores in the swarm +pub enum MockingType { + /// Don't mock anything + None, + /// Mock interest store + Interest, + /// Mock event store + Event, + /// Mock both interest and event stores + All, +} + +impl MockingType { + /// If type is mocking interest + pub fn mock_interest(&self) -> bool { + match self { + Self::Interest | Self::All => true, + Self::Event | Self::None => false, + } + } + + /// If type is mocking event + pub fn mock_event(&self) -> bool { + match self { + Self::Event | Self::All => true, + Self::Interest | Self::None => false, + } + } + + /// Create a test swarm from this type + pub fn into_swarm(self) -> TestSwarm { + let event_store = if self.mock_event() { + MockOrRealStore::Mock(Arc::new(Mutex::new(MockStoreForEventId::new()))) + } else { + MockOrRealStore::Real(BTreeStore::default()) + }; + let event_recon = crate::Recon::new( + event_store.clone(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + let event_client = start_recon(event_recon); + + let interest_store = if self.mock_interest() { + MockOrRealStore::Mock(Arc::new(Mutex::new(MockStoreForInterest::new()))) + } else { + MockOrRealStore::Real(BTreeStore::default()) + }; + let interest_recon = crate::Recon::new( + interest_store.clone(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + + let interest_client = start_recon(interest_recon); + + let behaviour = Behaviour::new( + interest_client, + event_client, + crate::libp2p::Config::default(), + ); + let behaviour = TestBehaviour { + inner: behaviour, + convert: Box::new(|_, ev| Some(ev)), + api: Box::new(|_, _| ()), + }; + + let (tx_events, rx_events) = tokio::sync::mpsc::channel(100); + let swarm = libp2p::Swarm::new_ephemeral(|_| TestNetworkBehaviour { + inner: InjectingBehavior::new(rx_events, behaviour), + }); + + TestSwarm:: { + events: tx_events, + interest_store, + event_store, + swarm, + } + } +} + +/// Information about a swarm being driven +pub struct DrivenSwarm { + /// Channel to receive swarm events + events: tokio::sync::mpsc::Sender, + /// Channel to stop the swarm + stop: tokio::sync::oneshot::Sender<()>, + /// Handle to the swarm task + complete: tokio::task::JoinHandle, +} + +impl DrivenSwarm { + /// Inject an event into the swarm + pub async fn inject(&self, ev: InjectedEvent) { + let _ = self.events.send(ev).await; + } + + /// Stop the swarm + pub async fn stop(self) -> InjectorStats { + let _ = self.stop.send(()); + + self.complete.await.unwrap() + } +} + +/// Container for swarm testing objects +pub struct TestSwarm +where + N: NetworkBehaviour, +{ + /// Ability to inject events into swarm + pub events: tokio::sync::mpsc::Sender, + /// Interests used by recon client connects + pub interest_store: MockOrRealStore, + /// Event ids used by recon client connects + pub event_store: MockOrRealStore, + /// Swarm instance + pub swarm: libp2p::Swarm>, +} + +fn start_recon(recon: crate::Recon) -> Client +where + K: Key, + H: crate::AssociativeHash, + S: Store + Send + Sync + 'static, + I: InterestProvider + Send + Sync + 'static, +{ + let mut server = crate::Server::new(recon); + let client = server.client(); + tokio::spawn(server.run()); + client +} + +type StoreBehavior = Behaviour, Client>; + +impl TestSwarm +where + N: NetworkBehaviour + Send, +{ + /// Create a new swarm with behavior + pub fn from_behaviour(behaviour: TestBehaviour) -> Self { + let (tx_events, rx_events) = tokio::sync::mpsc::channel(100); + let swarm = libp2p::Swarm::new_ephemeral(|_| TestNetworkBehaviour { + inner: InjectingBehavior::new(rx_events, behaviour), + }); + Self { + events: tx_events, + interest_store: MockOrRealStore::Real(BTreeStore::default()), + event_store: MockOrRealStore::Real(BTreeStore::default()), + swarm, + } + } + + /// Drive the swarm until stopped + pub fn drive(mut self) -> DrivenSwarm { + let tx_events = self.events.clone(); + let (tx_stop, rx_stop) = tokio::sync::oneshot::channel(); + let stop_fut = async move { + let _ = rx_stop.await; + }; + let mut stop_fut = stop_fut.boxed().fuse(); + let complete = tokio::spawn(async move { + loop { + let opt_ev = futures::select! { + _ = stop_fut => break, + opt_ev = self.swarm.next() => opt_ev, + }; + if let Some(ev) = opt_ev { + tracing::info!("{:?}", ev); + } else { + break; + } + } + self.swarm.behaviour().stats() + }); + DrivenSwarm { + events: tx_events, + stop: tx_stop, + complete, + } + } +} diff --git a/store/src/metrics.rs b/store/src/metrics.rs index 4f722ca6e..25dae3e76 100644 --- a/store/src/metrics.rs +++ b/store/src/metrics.rs @@ -277,7 +277,7 @@ where type Key = K; type Hash = H; - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> ReconResult { let new_val = item.value.is_some(); let new = StoreMetricsMiddleware::::record(&self.metrics, "insert", self.store.insert(item)) @@ -286,7 +286,7 @@ where Ok(new) } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { + async fn insert_many<'a>(&self, items: &[ReconItem<'a, K>]) -> ReconResult { let res = StoreMetricsMiddleware::::record( &self.metrics, "insert_many", diff --git a/store/src/sql/event.rs b/store/src/sql/event.rs index 203fa5791..b63f8ee8f 100644 --- a/store/src/sql/event.rs +++ b/store/src/sql/event.rs @@ -335,14 +335,14 @@ impl recon::Store for SqliteEventStore { type Hash = Sha256a; /// Returns true if the key was new. The value is always updated if included - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> ReconResult { let (res, _new_val) = self.insert_item_int(item.to_owned()).await?; Ok(res) } /// Insert new keys into the key space. /// Returns true if a key did not previously exist. - async fn insert_many(&self, items: &[ReconItem<'_, EventId>]) -> ReconResult { + async fn insert_many<'a>(&self, items: &[ReconItem<'a, EventId>]) -> ReconResult { match items.len() { 0 => Ok(InsertResult::new(vec![], 0)), _ => { diff --git a/store/src/sql/interest.rs b/store/src/sql/interest.rs index b84a77248..74051fc9a 100644 --- a/store/src/sql/interest.rs +++ b/store/src/sql/interest.rs @@ -129,7 +129,7 @@ impl recon::Store for SqliteInterestStore { type Hash = Sha256a; /// Returns true if the key was new. The value is always updated if included - async fn insert(&self, item: &ReconItem<'_, Interest>) -> ReconResult { + async fn insert<'a>(&self, item: &ReconItem<'a, Interest>) -> ReconResult { // interests don't have values, if someone gives us something we throw an error but allow None/vec![] if let Some(val) = item.value { if !val.is_empty() { @@ -143,7 +143,10 @@ impl recon::Store for SqliteInterestStore { /// Insert new keys into the key space. /// Returns true if a key did not previously exist. - async fn insert_many(&self, items: &[ReconItem<'_, Interest>]) -> ReconResult { + async fn insert_many<'a>( + &self, + items: &[ReconItem<'a, Interest>], + ) -> ReconResult { match items.len() { 0 => Ok(InsertResult::new(vec![], 0)), _ => {