From e08a1a6974d021b3a2bfec031815bc1de510c5f8 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 30 Oct 2023 12:35:06 +0800 Subject: [PATCH] feat: enable dispatch event using single thread (#3828) * refactor: lib dispatch * chore: type def * chore: type def * fix: local set spawn * chore: replace tokio spawn * chore: update log * chore: boxed event * chore: tauri lock --- frontend/appflowy_tauri/src-tauri/Cargo.lock | 81 ++---- frontend/rust-lib/Cargo.lock | 131 +++++---- .../collab-integrate/src/collab_builder.rs | 5 +- frontend/rust-lib/dart-ffi/src/lib.rs | 53 ++-- .../rust-lib/event-integration/Cargo.toml | 3 +- .../event-integration/src/event_builder.rs | 7 - .../rust-lib/event-integration/src/lib.rs | 36 ++- .../event-integration/src/user_event.rs | 13 +- .../tests/database/supabase_test/helper.rs | 4 +- .../tests/database/supabase_test/test.rs | 8 +- .../tests/document/af_cloud_test/edit_test.rs | 6 +- .../tests/document/af_cloud_test/util.rs | 2 +- .../tests/document/supabase_test/edit_test.rs | 10 +- .../tests/document/supabase_test/helper.rs | 2 +- .../tests/folder/local_test/script.rs | 2 +- .../folder/local_test/subscription_test.rs | 37 ++- .../tests/folder/local_test/test.rs | 2 +- .../tests/folder/supabase_test/helper.rs | 2 +- .../tests/folder/supabase_test/test.rs | 16 +- .../tests/user/af_cloud_test/auth_test.rs | 4 +- .../tests/user/af_cloud_test/member_test.rs | 8 +- .../tests/user/local_test/auth_test.rs | 8 +- .../user/local_test/user_awareness_test.rs | 4 +- .../user/local_test/user_profile_test.rs | 43 +-- .../user/migration_test/document_test.rs | 3 +- .../tests/user/migration_test/version_test.rs | 3 +- .../tests/user/supabase_test/auth_test.rs | 23 +- .../user/supabase_test/workspace_test.rs | 2 +- .../rust-lib/event-integration/tests/util.rs | 16 +- frontend/rust-lib/flowy-core/Cargo.toml | 3 +- .../rust-lib/flowy-core/src/integrate/log.rs | 1 + frontend/rust-lib/flowy-core/src/lib.rs | 46 ++-- .../flowy-database2/src/event_handler.rs | 4 +- .../rust-lib/flowy-database2/src/manager.rs | 3 +- .../src/services/database/database_editor.rs | 5 +- .../src/services/database_view/view_editor.rs | 7 +- .../src/services/group/configuration.rs | 3 +- .../tests/database/database_editor.rs | 8 +- .../tests/database/filter_test/script.rs | 3 +- .../rust-lib/flowy-document2/src/document.rs | 5 +- .../rust-lib/flowy-folder2/src/manager.rs | 9 +- frontend/rust-lib/flowy-server/Cargo.toml | 1 + .../flowy-server/src/af_cloud/server.rs | 7 +- .../flowy-server/src/supabase/api/database.rs | 5 +- .../flowy-server/src/supabase/api/document.rs | 5 +- .../flowy-server/src/supabase/api/folder.rs | 3 +- .../flowy-server/src/supabase/api/user.rs | 7 +- .../rust-lib/flowy-user/src/event_handler.rs | 2 +- frontend/rust-lib/flowy-user/src/manager.rs | 9 +- .../flowy-user/src/services/user_workspace.rs | 3 +- frontend/rust-lib/lib-dispatch/Cargo.toml | 5 +- .../rust-lib/lib-dispatch/src/byte_trait.rs | 18 +- .../rust-lib/lib-dispatch/src/dispatcher.rs | 257 ++++++++++++++---- .../lib-dispatch/src/errors/errors.rs | 14 +- .../lib-dispatch/src/module/container.rs | 23 +- .../rust-lib/lib-dispatch/src/module/data.rs | 18 +- .../rust-lib/lib-dispatch/src/module/mod.rs | 1 + .../lib-dispatch/src/module/module.rs | 28 +- .../lib-dispatch/src/request/payload.rs | 4 +- .../lib-dispatch/src/request/request.rs | 31 ++- frontend/rust-lib/lib-dispatch/src/runtime.rs | 105 ++++++- .../lib-dispatch/src/service/boxed.rs | 52 ++-- .../lib-dispatch/src/service/handler.rs | 27 +- .../rust-lib/lib-dispatch/tests/api/module.rs | 7 +- frontend/rust-lib/lib-log/Cargo.toml | 8 +- frontend/rust-lib/lib-log/src/layer.rs | 31 +-- frontend/rust-lib/lib-log/src/lib.rs | 42 +-- 67 files changed, 806 insertions(+), 538 deletions(-) diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.lock b/frontend/appflowy_tauri/src-tauri/Cargo.lock index 545b6438e437a..38f1f5f53782d 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.lock +++ b/frontend/appflowy_tauri/src-tauri/Cargo.lock @@ -129,15 +129,6 @@ dependencies = [ "libc", ] -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi", -] - [[package]] name = "anyhow" version = "1.0.75" @@ -454,7 +445,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b" dependencies = [ "borsh-derive", - "hashbrown 0.12.3", + "hashbrown 0.13.2", ] [[package]] @@ -1939,6 +1930,7 @@ dependencies = [ "flowy-task", "flowy-user", "flowy-user-deps", + "futures", "futures-core", "lib-dispatch", "lib-infra", @@ -2201,6 +2193,7 @@ dependencies = [ "hex", "hyper", "lazy_static", + "lib-dispatch", "lib-infra", "mime_guess", "parking_lot", @@ -3484,8 +3477,8 @@ dependencies = [ "tracing-appender", "tracing-bunyan-formatter", "tracing-core", - "tracing-log", - "tracing-subscriber 0.2.25", + "tracing-log 0.2.0", + "tracing-subscriber", ] [[package]] @@ -3607,7 +3600,7 @@ dependencies = [ "serde", "serde_json", "tracing", - "tracing-subscriber 0.3.17", + "tracing-subscriber", ] [[package]] @@ -3662,15 +3655,6 @@ dependencies = [ "tendril", ] -[[package]] -name = "matchers" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" -dependencies = [ - "regex-automata", -] - [[package]] name = "matchers" version = "0.1.0" @@ -6653,13 +6637,13 @@ dependencies = [ [[package]] name = "tracing-appender" -version = "0.1.2" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9965507e507f12c8901432a33e31131222abac31edd90cabbcf85cf544b7127a" +checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" dependencies = [ - "chrono", "crossbeam-channel", - "tracing-subscriber 0.2.25", + "time", + "tracing-subscriber", ] [[package]] @@ -6675,19 +6659,20 @@ dependencies = [ [[package]] name = "tracing-bunyan-formatter" -version = "0.2.6" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c408910c9b7eabc0215fe2b4a89f8ec95581a91cea1f7619f7c78caf14cbc2a1" +checksum = "b5c266b9ac83dedf0e0385ad78514949e6d89491269e7065bee51d2bb8ec7373" dependencies = [ - "chrono", + "ahash 0.8.3", "gethostname", "log", "serde", "serde_json", + "time", "tracing", "tracing-core", - "tracing-log", - "tracing-subscriber 0.2.25", + "tracing-log 0.1.3", + "tracing-subscriber", ] [[package]] @@ -6712,35 +6697,24 @@ dependencies = [ ] [[package]] -name = "tracing-serde" -version = "0.1.3" +name = "tracing-log" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ - "serde", + "log", + "once_cell", "tracing-core", ] [[package]] -name = "tracing-subscriber" -version = "0.2.25" +name = "tracing-serde" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" dependencies = [ - "ansi_term", - "chrono", - "lazy_static", - "matchers 0.0.1", - "regex", "serde", - "serde_json", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", "tracing-core", - "tracing-log", - "tracing-serde", ] [[package]] @@ -6749,16 +6723,19 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ - "matchers 0.1.0", + "matchers", "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.1.3", + "tracing-serde", ] [[package]] diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index e94ab9c832394..036d08feffa92 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -115,15 +115,6 @@ dependencies = [ "libc", ] -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi", -] - [[package]] name = "anyhow" version = "1.0.75" @@ -461,7 +452,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b" dependencies = [ "borsh-derive", - "hashbrown 0.12.3", + "hashbrown 0.13.2", ] [[package]] @@ -979,7 +970,7 @@ dependencies = [ "tonic", "tracing", "tracing-core", - "tracing-subscriber 0.3.17", + "tracing-subscriber", ] [[package]] @@ -1138,7 +1129,7 @@ dependencies = [ "cssparser-macros", "dtoa-short", "itoa", - "phf 0.8.0", + "phf 0.11.2", "smallvec", ] @@ -1759,6 +1750,7 @@ dependencies = [ "flowy-task", "flowy-user", "flowy-user-deps", + "futures", "futures-core", "lib-dispatch", "lib-infra", @@ -1900,7 +1892,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", - "tracing-subscriber 0.3.17", + "tracing-subscriber", "uuid", ] @@ -2026,6 +2018,7 @@ dependencies = [ "hex", "hyper", "lazy_static", + "lib-dispatch", "lib-infra", "mime_guess", "parking_lot", @@ -2040,7 +2033,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", - "tracing-subscriber 0.3.17", + "tracing-subscriber", "url", "uuid", "yrs", @@ -2242,9 +2235,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", "futures-sink", @@ -2252,9 +2245,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] name = "futures-executor" @@ -2280,15 +2273,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", @@ -2297,15 +2290,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" [[package]] name = "futures-timer" @@ -2315,9 +2308,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-channel", "futures-core", @@ -2984,8 +2977,8 @@ dependencies = [ "tracing-appender", "tracing-bunyan-formatter", "tracing-core", - "tracing-log", - "tracing-subscriber 0.2.25", + "tracing-log 0.2.0", + "tracing-subscriber", ] [[package]] @@ -3115,15 +3108,6 @@ dependencies = [ "tendril", ] -[[package]] -name = "matchers" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" -dependencies = [ - "regex-automata 0.1.10", -] - [[package]] name = "matchers" version = "0.1.0" @@ -3626,7 +3610,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12" dependencies = [ - "phf_macros", + "phf_macros 0.8.0", "phf_shared 0.8.0", "proc-macro-hack", ] @@ -3646,6 +3630,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" dependencies = [ + "phf_macros 0.11.2", "phf_shared 0.11.2", ] @@ -3713,6 +3698,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "phf_macros" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3444646e286606587e49f3bcf1679b8cef1dc2c5ecc29ddacaffc305180d464b" +dependencies = [ + "phf_generator 0.11.2", + "phf_shared 0.11.2", + "proc-macro2", + "quote", + "syn 2.0.31", +] + [[package]] name = "phf_shared" version = "0.8.0" @@ -5604,13 +5602,13 @@ dependencies = [ [[package]] name = "tracing-appender" -version = "0.1.2" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9965507e507f12c8901432a33e31131222abac31edd90cabbcf85cf544b7127a" +checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" dependencies = [ - "chrono", "crossbeam-channel", - "tracing-subscriber 0.2.25", + "time", + "tracing-subscriber", ] [[package]] @@ -5626,19 +5624,20 @@ dependencies = [ [[package]] name = "tracing-bunyan-formatter" -version = "0.2.6" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c408910c9b7eabc0215fe2b4a89f8ec95581a91cea1f7619f7c78caf14cbc2a1" +checksum = "b5c266b9ac83dedf0e0385ad78514949e6d89491269e7065bee51d2bb8ec7373" dependencies = [ - "chrono", + "ahash 0.8.3", "gethostname", "log", "serde", "serde_json", + "time", "tracing", "tracing-core", - "tracing-log", - "tracing-subscriber 0.2.25", + "tracing-log 0.1.3", + "tracing-subscriber", ] [[package]] @@ -5663,35 +5662,24 @@ dependencies = [ ] [[package]] -name = "tracing-serde" -version = "0.1.3" +name = "tracing-log" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ - "serde", + "log", + "once_cell", "tracing-core", ] [[package]] -name = "tracing-subscriber" -version = "0.2.25" +name = "tracing-serde" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" dependencies = [ - "ansi_term", - "chrono", - "lazy_static", - "matchers 0.0.1", - "regex", "serde", - "serde_json", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", "tracing-core", - "tracing-log", - "tracing-serde", ] [[package]] @@ -5700,16 +5688,19 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ - "matchers 0.1.0", + "matchers", "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.1.3", + "tracing-serde", ] [[package]] diff --git a/frontend/rust-lib/collab-integrate/src/collab_builder.rs b/frontend/rust-lib/collab-integrate/src/collab_builder.rs index 42d8eee6256ab..9a0fedfdd9c6f 100644 --- a/frontend/rust-lib/collab-integrate/src/collab_builder.rs +++ b/frontend/rust-lib/collab-integrate/src/collab_builder.rs @@ -41,10 +41,7 @@ pub enum CollabPluginContext { pub trait CollabStorageProvider: Send + Sync + 'static { fn storage_source(&self) -> CollabSource; - fn get_plugins( - &self, - context: CollabPluginContext, - ) -> Fut>>; + fn get_plugins(&self, context: CollabPluginContext) -> Fut>>; fn is_sync_enabled(&self) -> bool; } diff --git a/frontend/rust-lib/dart-ffi/src/lib.rs b/frontend/rust-lib/dart-ffi/src/lib.rs index 256ad1d8f216f..53e2795f83962 100644 --- a/frontend/rust-lib/dart-ffi/src/lib.rs +++ b/frontend/rust-lib/dart-ffi/src/lib.rs @@ -1,9 +1,12 @@ #![allow(clippy::not_unsafe_ptr_arg_deref)] +use std::sync::Arc; use std::{ffi::CStr, os::raw::c_char}; use lazy_static::lazy_static; -use parking_lot::RwLock; +use log::error; +use parking_lot::Mutex; +use tracing::trace; use flowy_core::*; use flowy_notification::{register_notification_sender, unregister_all_notification_sender}; @@ -25,9 +28,26 @@ mod protobuf; mod util; lazy_static! { - static ref APPFLOWY_CORE: RwLock> = RwLock::new(None); + static ref APPFLOWY_CORE: MutexAppFlowyCore = MutexAppFlowyCore::new(); } +struct MutexAppFlowyCore(Arc>>); + +impl MutexAppFlowyCore { + fn new() -> Self { + Self(Arc::new(Mutex::new(None))) + } + + fn dispatcher(&self) -> Option> { + let binding = self.0.lock(); + let core = binding.as_ref(); + core.map(|core| core.event_dispatcher.clone()) + } +} + +unsafe impl Sync for MutexAppFlowyCore {} +unsafe impl Send for MutexAppFlowyCore {} + #[no_mangle] pub extern "C" fn init_sdk(path: *mut c_char) -> i64 { let c_str: &CStr = unsafe { CStr::from_ptr(path) }; @@ -36,32 +56,33 @@ pub extern "C" fn init_sdk(path: *mut c_char) -> i64 { let log_crates = vec!["flowy-ffi".to_string()]; let config = AppFlowyCoreConfig::new(path, DEFAULT_NAME.to_string()).log_filter("info", log_crates); - *APPFLOWY_CORE.write() = Some(AppFlowyCore::new(config)); + *APPFLOWY_CORE.0.lock() = Some(AppFlowyCore::new(config)); 0 } #[no_mangle] +#[allow(clippy::let_underscore_future)] pub extern "C" fn async_event(port: i64, input: *const u8, len: usize) { let request: AFPluginRequest = FFIRequest::from_u8_pointer(input, len).into(); - log::trace!( + trace!( "[FFI]: {} Async Event: {:?} with {} port", &request.id, &request.event, port ); - let dispatcher = match APPFLOWY_CORE.read().as_ref() { + let dispatcher = match APPFLOWY_CORE.dispatcher() { None => { - log::error!("sdk not init yet."); + error!("sdk not init yet."); return; }, - Some(e) => e.event_dispatcher.clone(), + Some(dispatcher) => dispatcher, }; - AFPluginDispatcher::async_send_with_callback( + AFPluginDispatcher::boxed_async_send_with_callback( dispatcher, request, move |resp: AFPluginEventResponse| { - log::trace!("[FFI]: Post data to dart through {} port", port); + trace!("[FFI]: Post data to dart through {} port", port); Box::pin(post_to_flutter(resp, port)) }, ); @@ -70,14 +91,14 @@ pub extern "C" fn async_event(port: i64, input: *const u8, len: usize) { #[no_mangle] pub extern "C" fn sync_event(input: *const u8, len: usize) -> *const u8 { let request: AFPluginRequest = FFIRequest::from_u8_pointer(input, len).into(); - log::trace!("[FFI]: {} Sync Event: {:?}", &request.id, &request.event,); + trace!("[FFI]: {} Sync Event: {:?}", &request.id, &request.event,); - let dispatcher = match APPFLOWY_CORE.read().as_ref() { + let dispatcher = match APPFLOWY_CORE.dispatcher() { None => { - log::error!("sdk not init yet."); + error!("sdk not init yet."); return forget_rust(Vec::default()); }, - Some(e) => e.event_dispatcher.clone(), + Some(dispatcher) => dispatcher, }; let _response = AFPluginDispatcher::sync_send(dispatcher, request); @@ -110,13 +131,13 @@ async fn post_to_flutter(response: AFPluginEventResponse, port: i64) { .await { Ok(_success) => { - log::trace!("[FFI]: Post data to dart success"); + trace!("[FFI]: Post data to dart success"); }, Err(e) => { if let Some(msg) = e.downcast_ref::<&str>() { - log::error!("[FFI]: {:?}", msg); + error!("[FFI]: {:?}", msg); } else { - log::error!("[FFI]: allo_isolate post panic"); + error!("[FFI]: allo_isolate post panic"); } }, } diff --git a/frontend/rust-lib/event-integration/Cargo.toml b/frontend/rust-lib/event-integration/Cargo.toml index 3c35b0ff82aaa..f571986ffc93e 100644 --- a/frontend/rust-lib/event-integration/Cargo.toml +++ b/frontend/rust-lib/event-integration/Cargo.toml @@ -53,4 +53,5 @@ zip = "0.6.6" [features] default = ["supabase_cloud_test"] dart = ["flowy-core/dart"] -supabase_cloud_test = [] \ No newline at end of file +supabase_cloud_test = [] +single_thread = ["flowy-core/single_thread"] \ No newline at end of file diff --git a/frontend/rust-lib/event-integration/src/event_builder.rs b/frontend/rust-lib/event-integration/src/event_builder.rs index a6c0209622f2e..afa4590add7f0 100644 --- a/frontend/rust-lib/event-integration/src/event_builder.rs +++ b/frontend/rust-lib/event-integration/src/event_builder.rs @@ -48,13 +48,6 @@ impl EventBuilder { self } - pub fn sync_send(mut self) -> Self { - let request = self.get_request(); - let resp = AFPluginDispatcher::sync_send(self.dispatch(), request); - self.context.response = Some(resp); - self - } - pub async fn async_send(mut self) -> Self { let request = self.get_request(); let resp = AFPluginDispatcher::async_send(self.dispatch(), request).await; diff --git a/frontend/rust-lib/event-integration/src/lib.rs b/frontend/rust-lib/event-integration/src/lib.rs index f99304eba93eb..1a9d9cb7a14e8 100644 --- a/frontend/rust-lib/event-integration/src/lib.rs +++ b/frontend/rust-lib/event-integration/src/lib.rs @@ -27,30 +27,23 @@ pub struct EventIntegrationTest { pub notification_sender: TestNotificationSender, } -impl Default for EventIntegrationTest { - fn default() -> Self { +impl EventIntegrationTest { + pub async fn new() -> Self { let temp_dir = temp_dir().join(nanoid!(6)); std::fs::create_dir_all(&temp_dir).unwrap(); - Self::new_with_user_data_path(temp_dir, nanoid!(6)) - } -} - -impl EventIntegrationTest { - pub fn new() -> Self { - Self::default() + Self::new_with_user_data_path(temp_dir, nanoid!(6)).await } - pub fn new_with_user_data_path(path: PathBuf, name: String) -> Self { + pub async fn new_with_user_data_path(path: PathBuf, name: String) -> Self { let config = AppFlowyCoreConfig::new(path.to_str().unwrap(), name).log_filter( "trace", vec![ "flowy_test".to_string(), - // "lib_dispatch".to_string() + "tokio".to_string(), + "lib_dispatch".to_string(), ], ); - let inner = std::thread::spawn(|| AppFlowyCore::new(config)) - .join() - .unwrap(); + let inner = init_core(config).await; let notification_sender = TestNotificationSender::new(); let auth_type = Arc::new(RwLock::new(AuthTypePB::Local)); register_notification_sender(notification_sender.clone()); @@ -64,6 +57,21 @@ impl EventIntegrationTest { } } +#[cfg(feature = "single_thread")] +async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore { + // let runtime = tokio::runtime::Runtime::new().unwrap(); + // let local_set = tokio::task::LocalSet::new(); + // runtime.block_on(AppFlowyCore::new(config)) + AppFlowyCore::new(config).await +} + +#[cfg(not(feature = "single_thread"))] +async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore { + std::thread::spawn(|| AppFlowyCore::new(config)) + .join() + .unwrap() +} + impl std::ops::Deref for EventIntegrationTest { type Target = AppFlowyCore; diff --git a/frontend/rust-lib/event-integration/src/user_event.rs b/frontend/rust-lib/event-integration/src/user_event.rs index 9d03b8f1e8f4b..8a180c76e3a08 100644 --- a/frontend/rust-lib/event-integration/src/user_event.rs +++ b/frontend/rust-lib/event-integration/src/user_event.rs @@ -6,6 +6,7 @@ use bytes::Bytes; use nanoid::nanoid; use protobuf::ProtobufError; use tokio::sync::broadcast::{channel, Sender}; +use tracing::error; use uuid::Uuid; use flowy_notification::entities::SubscribeObject; @@ -17,7 +18,7 @@ use flowy_user::entities::{ }; use flowy_user::errors::{FlowyError, FlowyResult}; use flowy_user::event_map::UserEvent::*; -use lib_dispatch::prelude::{AFPluginDispatcher, AFPluginRequest, ToBytes}; +use lib_dispatch::prelude::{af_spawn, AFPluginDispatcher, AFPluginRequest, ToBytes}; use crate::event_builder::EventBuilder; use crate::EventIntegrationTest; @@ -44,7 +45,7 @@ impl EventIntegrationTest { } pub async fn new_with_guest_user() -> Self { - let test = Self::default(); + let test = Self::new().await; test.sign_up_as_guest().await; test } @@ -213,7 +214,7 @@ impl TestNotificationSender { let (tx, rx) = tokio::sync::mpsc::channel::(10); let mut receiver = self.sender.subscribe(); let ty = ty.into(); - tokio::spawn(async move { + af_spawn(async move { // DatabaseNotification::DidUpdateDatabaseSnapshotState while let Ok(value) = receiver.recv().await { if value.id == id && value.ty == ty { @@ -245,7 +246,7 @@ impl TestNotificationSender { let id = id.to_string(); let (tx, rx) = tokio::sync::mpsc::channel::(10); let mut receiver = self.sender.subscribe(); - tokio::spawn(async move { + af_spawn(async move { while let Ok(value) = receiver.recv().await { if value.id == id { if let Some(payload) = value.payload { @@ -263,7 +264,9 @@ impl TestNotificationSender { } impl NotificationSender for TestNotificationSender { fn send_subject(&self, subject: SubscribeObject) -> Result<(), String> { - let _ = self.sender.send(subject); + if let Err(err) = self.sender.send(subject) { + error!("Failed to send notification: {:?}", err); + } Ok(()) } } diff --git a/frontend/rust-lib/event-integration/tests/database/supabase_test/helper.rs b/frontend/rust-lib/event-integration/tests/database/supabase_test/helper.rs index c146644da2c83..5599f78cdec2d 100644 --- a/frontend/rust-lib/event-integration/tests/database/supabase_test/helper.rs +++ b/frontend/rust-lib/event-integration/tests/database/supabase_test/helper.rs @@ -22,13 +22,13 @@ pub struct FlowySupabaseDatabaseTest { impl FlowySupabaseDatabaseTest { #[allow(dead_code)] pub async fn new_with_user(uuid: String) -> Option { - let inner = FlowySupabaseTest::new()?; + let inner = FlowySupabaseTest::new().await?; inner.supabase_sign_up_with_uuid(&uuid, None).await.unwrap(); Some(Self { uuid, inner }) } pub async fn new_with_new_user() -> Option { - let inner = FlowySupabaseTest::new()?; + let inner = FlowySupabaseTest::new().await?; let uuid = uuid::Uuid::new_v4().to_string(); let _ = inner.supabase_sign_up_with_uuid(&uuid, None).await.unwrap(); Some(Self { uuid, inner }) diff --git a/frontend/rust-lib/event-integration/tests/database/supabase_test/test.rs b/frontend/rust-lib/event-integration/tests/database/supabase_test/test.rs index f5867c34f7f29..6877e511c2281 100644 --- a/frontend/rust-lib/event-integration/tests/database/supabase_test/test.rs +++ b/frontend/rust-lib/event-integration/tests/database/supabase_test/test.rs @@ -14,11 +14,11 @@ use crate::util::receive_with_timeout; async fn supabase_initial_database_snapshot_test() { if let Some(test) = FlowySupabaseDatabaseTest::new_with_new_user().await { let (view, database) = test.create_database().await; - let mut rx = test + let rx = test .notification_sender .subscribe::(&database.id, DidUpdateDatabaseSnapshotState); - receive_with_timeout(&mut rx, Duration::from_secs(30)) + receive_with_timeout(rx, Duration::from_secs(30)) .await .unwrap(); @@ -51,10 +51,10 @@ async fn supabase_edit_database_test() { .await; // wait all updates are send to the remote - let mut rx = test + let rx = test .notification_sender .subscribe_with_condition::(&database.id, |pb| pb.is_finish); - receive_with_timeout(&mut rx, Duration::from_secs(30)) + receive_with_timeout(rx, Duration::from_secs(30)) .await .unwrap(); diff --git a/frontend/rust-lib/event-integration/tests/document/af_cloud_test/edit_test.rs b/frontend/rust-lib/event-integration/tests/document/af_cloud_test/edit_test.rs index d261cc26f07b5..e5a08c8506e90 100644 --- a/frontend/rust-lib/event-integration/tests/document/af_cloud_test/edit_test.rs +++ b/frontend/rust-lib/event-integration/tests/document/af_cloud_test/edit_test.rs @@ -12,17 +12,17 @@ async fn af_cloud_edit_document_test() { let document_id = test.create_document().await; let cloned_test = test.clone(); let cloned_document_id = document_id.clone(); - tokio::spawn(async move { + test.inner.dispatcher().spawn(async move { cloned_test .insert_document_text(&cloned_document_id, "hello world", 0) .await; }); // wait all update are send to the remote - let mut rx = test + let rx = test .notification_sender .subscribe_with_condition::(&document_id, |pb| pb.is_finish); - receive_with_timeout(&mut rx, Duration::from_secs(15)) + receive_with_timeout(rx, Duration::from_secs(15)) .await .unwrap(); diff --git a/frontend/rust-lib/event-integration/tests/document/af_cloud_test/util.rs b/frontend/rust-lib/event-integration/tests/document/af_cloud_test/util.rs index 4abf56fa070cb..eef1023f48b0f 100644 --- a/frontend/rust-lib/event-integration/tests/document/af_cloud_test/util.rs +++ b/frontend/rust-lib/event-integration/tests/document/af_cloud_test/util.rs @@ -8,7 +8,7 @@ pub struct AFCloudDocumentTest { impl AFCloudDocumentTest { pub async fn new() -> Option { - let inner = AFCloudTest::new()?; + let inner = AFCloudTest::new().await?; let email = generate_test_email(); let _ = inner.af_cloud_sign_in_with_email(&email).await.unwrap(); Some(Self { inner }) diff --git a/frontend/rust-lib/event-integration/tests/document/supabase_test/edit_test.rs b/frontend/rust-lib/event-integration/tests/document/supabase_test/edit_test.rs index a4ad36d350f21..01f3ed5c215bd 100644 --- a/frontend/rust-lib/event-integration/tests/document/supabase_test/edit_test.rs +++ b/frontend/rust-lib/event-integration/tests/document/supabase_test/edit_test.rs @@ -14,17 +14,17 @@ async fn supabase_document_edit_sync_test() { let cloned_test = test.clone(); let cloned_document_id = document_id.clone(); - tokio::spawn(async move { + test.inner.dispatcher().spawn(async move { cloned_test .insert_document_text(&cloned_document_id, "hello world", 0) .await; }); // wait all update are send to the remote - let mut rx = test + let rx = test .notification_sender .subscribe_with_condition::(&document_id, |pb| pb.is_finish); - receive_with_timeout(&mut rx, Duration::from_secs(30)) + receive_with_timeout(rx, Duration::from_secs(30)) .await .unwrap(); @@ -47,10 +47,10 @@ async fn supabase_document_edit_sync_test2() { } // wait all update are send to the remote - let mut rx = test + let rx = test .notification_sender .subscribe_with_condition::(&document_id, |pb| pb.is_finish); - receive_with_timeout(&mut rx, Duration::from_secs(30)) + receive_with_timeout(rx, Duration::from_secs(30)) .await .unwrap(); diff --git a/frontend/rust-lib/event-integration/tests/document/supabase_test/helper.rs b/frontend/rust-lib/event-integration/tests/document/supabase_test/helper.rs index 7244a33a34f46..5ddd359a3c4a1 100644 --- a/frontend/rust-lib/event-integration/tests/document/supabase_test/helper.rs +++ b/frontend/rust-lib/event-integration/tests/document/supabase_test/helper.rs @@ -13,7 +13,7 @@ pub struct FlowySupabaseDocumentTest { impl FlowySupabaseDocumentTest { pub async fn new() -> Option { - let inner = FlowySupabaseTest::new()?; + let inner = FlowySupabaseTest::new().await?; let uuid = uuid::Uuid::new_v4().to_string(); let _ = inner.supabase_sign_up_with_uuid(&uuid, None).await; Some(Self { inner }) diff --git a/frontend/rust-lib/event-integration/tests/folder/local_test/script.rs b/frontend/rust-lib/event-integration/tests/folder/local_test/script.rs index 09a11a94f8651..82d0db03d0d4e 100644 --- a/frontend/rust-lib/event-integration/tests/folder/local_test/script.rs +++ b/frontend/rust-lib/event-integration/tests/folder/local_test/script.rs @@ -75,7 +75,7 @@ pub struct FolderTest { impl FolderTest { pub async fn new() -> Self { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let _ = sdk.init_anon_user().await; let workspace = create_workspace(&sdk, "FolderWorkspace", "Folder test workspace").await; let parent_view = create_app(&sdk, &workspace.id, "Folder App", "Folder test app").await; diff --git a/frontend/rust-lib/event-integration/tests/folder/local_test/subscription_test.rs b/frontend/rust-lib/event-integration/tests/folder/local_test/subscription_test.rs index 2df246b212acd..0ad2deaa0f1f6 100644 --- a/frontend/rust-lib/event-integration/tests/folder/local_test/subscription_test.rs +++ b/frontend/rust-lib/event-integration/tests/folder/local_test/subscription_test.rs @@ -18,19 +18,19 @@ use crate::util::receive_with_timeout; async fn create_child_view_in_workspace_subscription_test() { let test = EventIntegrationTest::new_with_guest_user().await; let workspace = test.get_current_workspace().await.workspace; - let mut rx = test + let rx = test .notification_sender .subscribe::(&workspace.id, FolderNotification::DidUpdateWorkspaceViews); let cloned_test = test.clone(); let cloned_workspace_id = workspace.id.clone(); - tokio::spawn(async move { + test.inner.dispatcher().spawn(async move { cloned_test .create_view(&cloned_workspace_id, "workspace child view".to_string()) .await; }); - let views = receive_with_timeout(&mut rx, Duration::from_secs(30)) + let views = receive_with_timeout(rx, Duration::from_secs(30)) .await .unwrap() .items; @@ -43,14 +43,14 @@ async fn create_child_view_in_view_subscription_test() { let test = EventIntegrationTest::new_with_guest_user().await; let mut workspace = test.get_current_workspace().await.workspace; let workspace_child_view = workspace.views.pop().unwrap(); - let mut rx = test.notification_sender.subscribe::( + let rx = test.notification_sender.subscribe::( &workspace_child_view.id, FolderNotification::DidUpdateChildViews, ); let cloned_test = test.clone(); let child_view_id = workspace_child_view.id.clone(); - tokio::spawn(async move { + test.inner.dispatcher().spawn(async move { cloned_test .create_view( &child_view_id, @@ -59,7 +59,7 @@ async fn create_child_view_in_view_subscription_test() { .await; }); - let update = receive_with_timeout(&mut rx, Duration::from_secs(30)) + let update = receive_with_timeout(rx, Duration::from_secs(30)) .await .unwrap(); @@ -74,20 +74,29 @@ async fn create_child_view_in_view_subscription_test() { async fn delete_view_subscription_test() { let test = EventIntegrationTest::new_with_guest_user().await; let workspace = test.get_current_workspace().await.workspace; - let mut rx = test + let rx = test .notification_sender .subscribe::(&workspace.id, FolderNotification::DidUpdateChildViews); let cloned_test = test.clone(); let delete_view_id = workspace.views.first().unwrap().id.clone(); let cloned_delete_view_id = delete_view_id.clone(); - tokio::spawn(async move { - cloned_test.delete_view(&cloned_delete_view_id).await; - }); + test + .inner + .dispatcher() + .spawn(async move { + cloned_test.delete_view(&cloned_delete_view_id).await; + }) + .await + .unwrap(); - let update = receive_with_timeout(&mut rx, Duration::from_secs(30)) + let update = test + .inner + .dispatcher() + .run_until(receive_with_timeout(rx, Duration::from_secs(30))) .await .unwrap(); + assert_eq!(update.delete_child_views.len(), 1); assert_eq!(update.delete_child_views[0], delete_view_id); } @@ -96,7 +105,7 @@ async fn delete_view_subscription_test() { async fn update_view_subscription_test() { let test = EventIntegrationTest::new_with_guest_user().await; let mut workspace = test.get_current_workspace().await.workspace; - let mut rx = test + let rx = test .notification_sender .subscribe::(&workspace.id, FolderNotification::DidUpdateChildViews); @@ -105,7 +114,7 @@ async fn update_view_subscription_test() { assert!(!view.is_favorite); let update_view_id = view.id.clone(); - tokio::spawn(async move { + test.inner.dispatcher().spawn(async move { cloned_test .update_view(UpdateViewPayloadPB { view_id: update_view_id, @@ -116,7 +125,7 @@ async fn update_view_subscription_test() { .await; }); - let update = receive_with_timeout(&mut rx, Duration::from_secs(30)) + let update = receive_with_timeout(rx, Duration::from_secs(30)) .await .unwrap(); assert_eq!(update.update_child_views.len(), 1); diff --git a/frontend/rust-lib/event-integration/tests/folder/local_test/test.rs b/frontend/rust-lib/event-integration/tests/folder/local_test/test.rs index 87cc6b82d9e6c..82215040d6207 100644 --- a/frontend/rust-lib/event-integration/tests/folder/local_test/test.rs +++ b/frontend/rust-lib/event-integration/tests/folder/local_test/test.rs @@ -466,7 +466,7 @@ async fn move_view_event_after_delete_view_test2() { #[tokio::test] async fn create_parent_view_with_invalid_name() { for (name, code) in invalid_workspace_name_test_case() { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let request = CreateWorkspacePayloadPB { name, desc: "".to_owned(), diff --git a/frontend/rust-lib/event-integration/tests/folder/supabase_test/helper.rs b/frontend/rust-lib/event-integration/tests/folder/supabase_test/helper.rs index e86257c32d0c5..eac5cb418a3fd 100644 --- a/frontend/rust-lib/event-integration/tests/folder/supabase_test/helper.rs +++ b/frontend/rust-lib/event-integration/tests/folder/supabase_test/helper.rs @@ -19,7 +19,7 @@ pub struct FlowySupabaseFolderTest { impl FlowySupabaseFolderTest { pub async fn new() -> Option { - let inner = FlowySupabaseTest::new()?; + let inner = FlowySupabaseTest::new().await?; let uuid = uuid::Uuid::new_v4().to_string(); let _ = inner.supabase_sign_up_with_uuid(&uuid, None).await; Some(Self { inner }) diff --git a/frontend/rust-lib/event-integration/tests/folder/supabase_test/test.rs b/frontend/rust-lib/event-integration/tests/folder/supabase_test/test.rs index fb4ea0f361c3f..d04937d70dad8 100644 --- a/frontend/rust-lib/event-integration/tests/folder/supabase_test/test.rs +++ b/frontend/rust-lib/event-integration/tests/folder/supabase_test/test.rs @@ -34,11 +34,11 @@ async fn supabase_decrypt_folder_data_test() { .create_view(&workspace_id, "encrypt view".to_string()) .await; - let mut rx = test + let rx = test .notification_sender .subscribe_with_condition::(&workspace_id, |pb| pb.is_finish); - receive_with_timeout(&mut rx, Duration::from_secs(10)) + receive_with_timeout(rx, Duration::from_secs(10)) .await .unwrap(); let folder_data = get_folder_data_from_server(&workspace_id, secret) @@ -59,10 +59,10 @@ async fn supabase_decrypt_with_invalid_secret_folder_data_test() { test .create_view(&workspace_id, "encrypt view".to_string()) .await; - let mut rx = test + let rx = test .notification_sender .subscribe_with_condition::(&workspace_id, |pb| pb.is_finish); - receive_with_timeout(&mut rx, Duration::from_secs(10)) + receive_with_timeout(rx, Duration::from_secs(10)) .await .unwrap(); @@ -75,10 +75,10 @@ async fn supabase_decrypt_with_invalid_secret_folder_data_test() { async fn supabase_folder_snapshot_test() { if let Some(test) = FlowySupabaseFolderTest::new().await { let workspace_id = test.get_current_workspace().await.workspace.id; - let mut rx = test + let rx = test .notification_sender .subscribe::(&workspace_id, DidUpdateFolderSnapshotState); - receive_with_timeout(&mut rx, Duration::from_secs(10)) + receive_with_timeout(rx, Duration::from_secs(10)) .await .unwrap(); @@ -104,11 +104,11 @@ async fn supabase_initial_folder_snapshot_test2() { .create_view(&workspace_id, "supabase test view3".to_string()) .await; - let mut rx = test + let rx = test .notification_sender .subscribe_with_condition::(&workspace_id, |pb| pb.is_finish); - receive_with_timeout(&mut rx, Duration::from_secs(10)) + receive_with_timeout(rx, Duration::from_secs(10)) .await .unwrap(); diff --git a/frontend/rust-lib/event-integration/tests/user/af_cloud_test/auth_test.rs b/frontend/rust-lib/event-integration/tests/user/af_cloud_test/auth_test.rs index a37eaa1276a96..04c4666e4f00f 100644 --- a/frontend/rust-lib/event-integration/tests/user/af_cloud_test/auth_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/af_cloud_test/auth_test.rs @@ -6,7 +6,7 @@ use crate::util::{generate_test_email, get_af_cloud_config}; #[tokio::test] async fn af_cloud_sign_up_test() { if get_af_cloud_config().is_some() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let email = generate_test_email(); let user = test.af_cloud_sign_in_with_email(&email).await.unwrap(); assert_eq!(user.email, email); @@ -16,7 +16,7 @@ async fn af_cloud_sign_up_test() { #[tokio::test] async fn af_cloud_update_user_metadata() { if get_af_cloud_config().is_some() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let user = test.af_cloud_sign_up().await; let old_profile = test.get_user_profile().await.unwrap(); diff --git a/frontend/rust-lib/event-integration/tests/user/af_cloud_test/member_test.rs b/frontend/rust-lib/event-integration/tests/user/af_cloud_test/member_test.rs index 6020c8a56ae8c..577323eae34c8 100644 --- a/frontend/rust-lib/event-integration/tests/user/af_cloud_test/member_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/af_cloud_test/member_test.rs @@ -5,10 +5,10 @@ use crate::util::get_af_cloud_config; #[tokio::test] async fn af_cloud_add_workspace_member_test() { if get_af_cloud_config().is_some() { - let test_1 = EventIntegrationTest::new(); + let test_1 = EventIntegrationTest::new().await; let user_1 = test_1.af_cloud_sign_up().await; - let test_2 = EventIntegrationTest::new(); + let test_2 = EventIntegrationTest::new().await; let user_2 = test_2.af_cloud_sign_up().await; let members = test_1.get_workspace_members(&user_1.workspace_id).await; @@ -29,10 +29,10 @@ async fn af_cloud_add_workspace_member_test() { #[tokio::test] async fn af_cloud_delete_workspace_member_test() { if get_af_cloud_config().is_some() { - let test_1 = EventIntegrationTest::new(); + let test_1 = EventIntegrationTest::new().await; let user_1 = test_1.af_cloud_sign_up().await; - let test_2 = EventIntegrationTest::new(); + let test_2 = EventIntegrationTest::new().await; let user_2 = test_2.af_cloud_sign_up().await; test_1 diff --git a/frontend/rust-lib/event-integration/tests/user/local_test/auth_test.rs b/frontend/rust-lib/event-integration/tests/user/local_test/auth_test.rs index c5ee9c5560610..9aa35dd344b08 100644 --- a/frontend/rust-lib/event-integration/tests/user/local_test/auth_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/local_test/auth_test.rs @@ -9,7 +9,7 @@ use crate::user::local_test::helper::*; #[tokio::test] async fn sign_up_with_invalid_email() { for email in invalid_email_test_case() { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let request = SignUpPayloadPB { email: email.to_string(), name: valid_name(), @@ -33,7 +33,7 @@ async fn sign_up_with_invalid_email() { } #[tokio::test] async fn sign_up_with_long_password() { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let request = SignUpPayloadPB { email: unique_email(), name: valid_name(), @@ -58,7 +58,7 @@ async fn sign_up_with_long_password() { #[tokio::test] async fn sign_in_with_invalid_email() { for email in invalid_email_test_case() { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let request = SignInPayloadPB { email: email.to_string(), password: login_password(), @@ -84,7 +84,7 @@ async fn sign_in_with_invalid_email() { #[tokio::test] async fn sign_in_with_invalid_password() { for password in invalid_password_test_case() { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let request = SignInPayloadPB { email: unique_email(), diff --git a/frontend/rust-lib/event-integration/tests/user/local_test/user_awareness_test.rs b/frontend/rust-lib/event-integration/tests/user/local_test/user_awareness_test.rs index 68bae5c7a4b7b..8e1223f566429 100644 --- a/frontend/rust-lib/event-integration/tests/user/local_test/user_awareness_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/local_test/user_awareness_test.rs @@ -6,8 +6,8 @@ use flowy_user::entities::{ReminderPB, RepeatedReminderPB}; use flowy_user::event_map::UserEvent::*; #[tokio::test] -async fn user_update_with_name() { - let sdk = EventIntegrationTest::new(); +async fn user_update_with_reminder() { + let sdk = EventIntegrationTest::new().await; let _ = sdk.sign_up_as_guest().await; let mut meta = HashMap::new(); meta.insert("object_id".to_string(), "".to_string()); diff --git a/frontend/rust-lib/event-integration/tests/user/local_test/user_profile_test.rs b/frontend/rust-lib/event-integration/tests/user/local_test/user_profile_test.rs index 60f5cc84ccf29..7418b267afd45 100644 --- a/frontend/rust-lib/event-integration/tests/user/local_test/user_profile_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/local_test/user_profile_test.rs @@ -10,7 +10,7 @@ use crate::user::local_test::helper::*; #[tokio::test] async fn user_profile_get_failed() { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let result = EventBuilder::new(sdk) .event(GetUserProfile) .async_send() @@ -21,11 +21,12 @@ async fn user_profile_get_failed() { #[tokio::test] async fn anon_user_profile_get() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let user_profile = test.init_anon_user().await; let user = EventBuilder::new(test.clone()) .event(GetUserProfile) - .sync_send() + .async_send() + .await .parse::(); assert_eq!(user_profile.id, user.id); assert_eq!(user_profile.openai_key, user.openai_key); @@ -36,18 +37,20 @@ async fn anon_user_profile_get() { #[tokio::test] async fn user_update_with_name() { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let user = sdk.init_anon_user().await; let new_name = "hello_world".to_owned(); let request = UpdateUserProfilePayloadPB::new(user.id).name(&new_name); let _ = EventBuilder::new(sdk.clone()) .event(UpdateUserProfile) .payload(request) - .sync_send(); + .async_send() + .await; let user_profile = EventBuilder::new(sdk.clone()) .event(GetUserProfile) - .sync_send() + .async_send() + .await .parse::(); assert_eq!(user_profile.name, new_name,); @@ -55,7 +58,7 @@ async fn user_update_with_name() { #[tokio::test] async fn user_update_with_ai_key() { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let user = sdk.init_anon_user().await; let openai_key = "openai_key".to_owned(); let stability_ai_key = "stability_ai_key".to_owned(); @@ -65,11 +68,13 @@ async fn user_update_with_ai_key() { let _ = EventBuilder::new(sdk.clone()) .event(UpdateUserProfile) .payload(request) - .sync_send(); + .async_send() + .await; let user_profile = EventBuilder::new(sdk.clone()) .event(GetUserProfile) - .sync_send() + .async_send() + .await .parse::(); assert_eq!(user_profile.openai_key, openai_key,); @@ -78,17 +83,19 @@ async fn user_update_with_ai_key() { #[tokio::test] async fn anon_user_update_with_email() { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let user = sdk.init_anon_user().await; let new_email = format!("{}@gmail.com", nanoid!(6)); let request = UpdateUserProfilePayloadPB::new(user.id).email(&new_email); let _ = EventBuilder::new(sdk.clone()) .event(UpdateUserProfile) .payload(request) - .sync_send(); + .async_send() + .await; let user_profile = EventBuilder::new(sdk.clone()) .event(GetUserProfile) - .sync_send() + .async_send() + .await .parse::(); // When the user is anonymous, the email is empty no matter what you set @@ -97,7 +104,7 @@ async fn anon_user_update_with_email() { #[tokio::test] async fn user_update_with_invalid_email() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let user = test.init_anon_user().await; for email in invalid_email_test_case() { let request = UpdateUserProfilePayloadPB::new(user.id).email(&email); @@ -105,7 +112,8 @@ async fn user_update_with_invalid_email() { EventBuilder::new(test.clone()) .event(UpdateUserProfile) .payload(request) - .sync_send() + .async_send() + .await .error() .unwrap() .code, @@ -116,7 +124,7 @@ async fn user_update_with_invalid_email() { #[tokio::test] async fn user_update_with_invalid_password() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let user = test.init_anon_user().await; for password in invalid_password_test_case() { let request = UpdateUserProfilePayloadPB::new(user.id).password(&password); @@ -133,13 +141,14 @@ async fn user_update_with_invalid_password() { #[tokio::test] async fn user_update_with_invalid_name() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let user = test.init_anon_user().await; let request = UpdateUserProfilePayloadPB::new(user.id).name(""); assert!(EventBuilder::new(test.clone()) .event(UpdateUserProfile) .payload(request) - .sync_send() + .async_send() + .await .error() .is_some()) } diff --git a/frontend/rust-lib/event-integration/tests/user/migration_test/document_test.rs b/frontend/rust-lib/event-integration/tests/user/migration_test/document_test.rs index 9f3100b7ca4b2..f15b5c3fddc47 100644 --- a/frontend/rust-lib/event-integration/tests/user/migration_test/document_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/migration_test/document_test.rs @@ -11,7 +11,8 @@ async fn migrate_historical_empty_document_test() { "historical_empty_document", ) .unwrap(); - let test = EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()); + let test = + EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()).await; let views = test.get_all_workspace_views().await; assert_eq!(views.len(), 3); diff --git a/frontend/rust-lib/event-integration/tests/user/migration_test/version_test.rs b/frontend/rust-lib/event-integration/tests/user/migration_test/version_test.rs index ccd1b25e8c4a5..dcaed72a0dbd4 100644 --- a/frontend/rust-lib/event-integration/tests/user/migration_test/version_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/migration_test/version_test.rs @@ -11,7 +11,8 @@ async fn migrate_020_historical_empty_document_test() { "020_historical_user_data", ) .unwrap(); - let test = EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()); + let test = + EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()).await; let mut views = test.get_all_workspace_views().await; assert_eq!(views.len(), 1); diff --git a/frontend/rust-lib/event-integration/tests/user/supabase_test/auth_test.rs b/frontend/rust-lib/event-integration/tests/user/supabase_test/auth_test.rs index d2bf889a8351a..5375d5f33952a 100644 --- a/frontend/rust-lib/event-integration/tests/user/supabase_test/auth_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/supabase_test/auth_test.rs @@ -13,7 +13,7 @@ use event_integration::event_builder::EventBuilder; use event_integration::EventIntegrationTest; use flowy_core::DEFAULT_NAME; use flowy_encrypt::decrypt_text; -use flowy_server::supabase::define::{USER_EMAIL, USER_UUID}; +use flowy_server::supabase::define::{USER_DEVICE_ID, USER_EMAIL, USER_UUID}; use flowy_user::entities::{AuthTypePB, OauthSignInPB, UpdateUserProfilePayloadPB, UserProfilePB}; use flowy_user::errors::ErrorCode; use flowy_user::event_map::UserEvent::*; @@ -23,13 +23,14 @@ use crate::util::*; #[tokio::test] async fn third_party_sign_up_test() { if get_supabase_config().is_some() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let mut map = HashMap::new(); map.insert(USER_UUID.to_string(), uuid::Uuid::new_v4().to_string()); map.insert( USER_EMAIL.to_string(), format!("{}@appflowy.io", nanoid!(6)), ); + map.insert(USER_DEVICE_ID.to_string(), uuid::Uuid::new_v4().to_string()); let payload = OauthSignInPB { map, auth_type: AuthTypePB::Supabase, @@ -48,7 +49,7 @@ async fn third_party_sign_up_test() { #[tokio::test] async fn third_party_sign_up_with_encrypt_test() { if get_supabase_config().is_some() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; test.supabase_party_sign_up().await; let user_profile = test.get_user_profile().await.unwrap(); assert!(user_profile.encryption_sign.is_empty()); @@ -65,11 +66,12 @@ async fn third_party_sign_up_with_encrypt_test() { #[tokio::test] async fn third_party_sign_up_with_duplicated_uuid() { if get_supabase_config().is_some() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let email = format!("{}@appflowy.io", nanoid!(6)); let mut map = HashMap::new(); map.insert(USER_UUID.to_string(), uuid::Uuid::new_v4().to_string()); map.insert(USER_EMAIL.to_string(), email.clone()); + map.insert(USER_DEVICE_ID.to_string(), uuid::Uuid::new_v4().to_string()); let response_1 = EventBuilder::new(test.clone()) .event(OauthSignIn) @@ -98,7 +100,7 @@ async fn third_party_sign_up_with_duplicated_uuid() { #[tokio::test] async fn third_party_sign_up_with_duplicated_email() { if get_supabase_config().is_some() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let email = format!("{}@appflowy.io", nanoid!(6)); test .supabase_sign_up_with_uuid(&uuid::Uuid::new_v4().to_string(), Some(email.clone())) @@ -138,7 +140,6 @@ async fn sign_up_as_guest_and_then_update_to_new_cloud_user_test() { assert_eq!(old_workspace.views.len(), new_workspace.views.len()); for (index, view) in old_views.iter().enumerate() { assert_eq!(view.name, new_views[index].name); - assert_eq!(view.id, new_views[index].id); assert_eq!(view.layout, new_views[index].layout); assert_eq!(view.create_time, new_views[index].create_time); } @@ -196,7 +197,7 @@ async fn sign_up_as_guest_and_then_update_to_existing_cloud_user_test() { #[tokio::test] async fn get_user_profile_test() { - if let Some(test) = FlowySupabaseTest::new() { + if let Some(test) = FlowySupabaseTest::new().await { let uuid = uuid::Uuid::new_v4().to_string(); test.supabase_sign_up_with_uuid(&uuid, None).await.unwrap(); @@ -207,7 +208,7 @@ async fn get_user_profile_test() { #[tokio::test] async fn update_user_profile_test() { - if let Some(test) = FlowySupabaseTest::new() { + if let Some(test) = FlowySupabaseTest::new().await { let uuid = uuid::Uuid::new_v4().to_string(); let profile = test.supabase_sign_up_with_uuid(&uuid, None).await.unwrap(); test @@ -221,7 +222,7 @@ async fn update_user_profile_test() { #[tokio::test] async fn update_user_profile_with_existing_email_test() { - if let Some(test) = FlowySupabaseTest::new() { + if let Some(test) = FlowySupabaseTest::new().await { let email = format!("{}@appflowy.io", nanoid!(6)); let _ = test .supabase_sign_up_with_uuid(&uuid::Uuid::new_v4().to_string(), Some(email.clone())) @@ -249,7 +250,7 @@ async fn update_user_profile_with_existing_email_test() { #[tokio::test] async fn migrate_anon_document_on_cloud_signup() { if get_supabase_config().is_some() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let user_profile = test.sign_up_as_guest().await.user_profile; let view = test @@ -295,7 +296,7 @@ async fn migrate_anon_data_on_cloud_signup() { ) .unwrap(); let test = - EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()); + EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()).await; let user_profile = test.supabase_party_sign_up().await; // Get the folder data from remote diff --git a/frontend/rust-lib/event-integration/tests/user/supabase_test/workspace_test.rs b/frontend/rust-lib/event-integration/tests/user/supabase_test/workspace_test.rs index 49e4e289c7c28..2a971ef636351 100644 --- a/frontend/rust-lib/event-integration/tests/user/supabase_test/workspace_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/supabase_test/workspace_test.rs @@ -12,7 +12,7 @@ use crate::util::*; #[tokio::test] async fn initial_workspace_test() { if get_supabase_config().is_some() { - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; let mut map = HashMap::new(); map.insert(USER_UUID.to_string(), uuid::Uuid::new_v4().to_string()); map.insert( diff --git a/frontend/rust-lib/event-integration/tests/util.rs b/frontend/rust-lib/event-integration/tests/util.rs index 37a2b20616d88..7653b39c4c1ec 100644 --- a/frontend/rust-lib/event-integration/tests/util.rs +++ b/frontend/rust-lib/event-integration/tests/util.rs @@ -40,9 +40,9 @@ pub struct FlowySupabaseTest { } impl FlowySupabaseTest { - pub fn new() -> Option { + pub async fn new() -> Option { let _ = get_supabase_config()?; - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; test.set_auth_type(AuthTypePB::Supabase); test.server_provider.set_auth_type(AuthType::Supabase); @@ -71,12 +71,10 @@ impl Deref for FlowySupabaseTest { } pub async fn receive_with_timeout( - receiver: &mut Receiver, + mut receiver: Receiver, duration: Duration, -) -> Result> { - let res = timeout(duration, receiver.recv()) - .await? - .ok_or(anyhow::anyhow!("recv timeout"))?; +) -> Result> { + let res = timeout(duration, receiver.recv()).await.unwrap().unwrap(); Ok(res) } @@ -206,9 +204,9 @@ pub struct AFCloudTest { } impl AFCloudTest { - pub fn new() -> Option { + pub async fn new() -> Option { let _ = get_af_cloud_config()?; - let test = EventIntegrationTest::new(); + let test = EventIntegrationTest::new().await; test.set_auth_type(AuthTypePB::AFCloud); test.server_provider.set_auth_type(AuthType::AFCloud); diff --git a/frontend/rust-lib/flowy-core/Cargo.toml b/frontend/rust-lib/flowy-core/Cargo.toml index a843c8ecabb20..1dbc66e0117ad 100644 --- a/frontend/rust-lib/flowy-core/Cargo.toml +++ b/frontend/rust-lib/flowy-core/Cargo.toml @@ -46,6 +46,7 @@ lib-infra = { path = "../../../shared-lib/lib-infra" } serde = "1.0" serde_json = "1.0" serde_repr = "0.1" +futures = "0.3.28" [features] default = ["rev-sqlite"] @@ -71,4 +72,4 @@ ts = [ ] rev-sqlite = ["flowy-user/rev-sqlite"] openssl_vendored = ["flowy-sqlite/openssl_vendored"] - +single_thread = ["lib-dispatch/single_thread"] diff --git a/frontend/rust-lib/flowy-core/src/integrate/log.rs b/frontend/rust-lib/flowy-core/src/integrate/log.rs index cdcacfe940b81..c6c606a00b1c5 100644 --- a/frontend/rust-lib/flowy-core/src/integrate/log.rs +++ b/frontend/rust-lib/flowy-core/src/integrate/log.rs @@ -34,6 +34,7 @@ pub(crate) fn create_log_filter(level: String, with_crates: Vec) -> Stri filters.push(format!("flowy_notification={}", "info")); filters.push(format!("lib_infra={}", level)); filters.push(format!("flowy_task={}", level)); + // filters.push(format!("lib_dispatch={}", level)); filters.push(format!("dart_ffi={}", "info")); filters.push(format!("flowy_sqlite={}", "info")); diff --git a/frontend/rust-lib/flowy-core/src/lib.rs b/frontend/rust-lib/flowy-core/src/lib.rs index de97c12a51b8b..4a7ade1972998 100644 --- a/frontend/rust-lib/flowy-core/src/lib.rs +++ b/frontend/rust-lib/flowy-core/src/lib.rs @@ -5,7 +5,7 @@ use std::time::Duration; use std::{fmt, sync::Arc}; use tokio::sync::RwLock; -use tracing::error; +use tracing::{error, event, instrument}; use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabSource}; use flowy_database2::DatabaseManager; @@ -17,7 +17,7 @@ use flowy_task::{TaskDispatcher, TaskRunner}; use flowy_user::event_map::UserCloudServiceProvider; use flowy_user::manager::{UserManager, UserSessionConfig}; use lib_dispatch::prelude::*; -use lib_dispatch::runtime::tokio_default_runtime; +use lib_dispatch::runtime::AFPluginRuntime; use module::make_plugins; pub use module::*; @@ -82,7 +82,21 @@ pub struct AppFlowyCore { } impl AppFlowyCore { + #[cfg(feature = "single_thread")] + pub async fn new(config: AppFlowyCoreConfig) -> Self { + let runtime = Arc::new(AFPluginRuntime::new().unwrap()); + Self::init(config, runtime).await + } + + #[cfg(not(feature = "single_thread"))] pub fn new(config: AppFlowyCoreConfig) -> Self { + let runtime = Arc::new(AFPluginRuntime::new().unwrap()); + let cloned_runtime = runtime.clone(); + runtime.block_on(Self::init(config, cloned_runtime)) + } + + #[instrument(skip(config, runtime))] + async fn init(config: AppFlowyCoreConfig, runtime: Arc) -> Self { /// The profiling can be used to tracing the performance of the application. /// Check out the [Link](https://appflowy.gitbook.io/docs/essential-documentation/contribute-to-appflowy/architecture/backend/profiling) /// for more information. @@ -95,8 +109,8 @@ impl AppFlowyCore { // Init the key value database let store_preference = Arc::new(StorePreferences::new(&config.storage_path).unwrap()); - tracing::info!("🔥 {:?}", &config); - let runtime = tokio_default_runtime().unwrap(); + tracing::info!("🔥db {:?}", &config); + tracing::debug!("🔥{}", runtime); let task_scheduler = TaskDispatcher::new(Duration::from_secs(2)); let task_dispatcher = Arc::new(RwLock::new(task_scheduler)); runtime.spawn(TaskRunner::run(task_dispatcher.clone())); @@ -108,6 +122,7 @@ impl AppFlowyCore { Arc::downgrade(&store_preference), )); + event!(tracing::Level::DEBUG, "Init managers",); let ( user_manager, folder_manager, @@ -115,7 +130,7 @@ impl AppFlowyCore { database_manager, document_manager, collab_builder, - ) = runtime.block_on(async { + ) = async { /// The shared collab builder is used to build the [Collab] instance. The plugins will be loaded /// on demand based on the [CollabPluginConfig]. let collab_builder = Arc::new(AppFlowyCollabBuilder::new(server_provider.clone())); @@ -162,7 +177,8 @@ impl AppFlowyCore { document_manager, collab_builder, ) - }); + } + .await; let user_status_callback = UserStatusCallbackImpl { collab_builder, @@ -179,17 +195,15 @@ impl AppFlowyCore { }; let cloned_user_session = Arc::downgrade(&user_manager); - runtime.block_on(async move { - if let Some(user_manager) = cloned_user_session.upgrade() { - if let Err(err) = user_manager - .init(user_status_callback, collab_interact_impl) - .await - { - error!("Init user failed: {}", err) - } + if let Some(user_session) = cloned_user_session.upgrade() { + event!(tracing::Level::DEBUG, "init user session",); + if let Err(err) = user_session + .init(user_status_callback, collab_interact_impl) + .await + { + error!("Init user failed: {}", err) } - }); - + } let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || { make_plugins( Arc::downgrade(&folder_manager), diff --git a/frontend/rust-lib/flowy-database2/src/event_handler.rs b/frontend/rust-lib/flowy-database2/src/event_handler.rs index 06547c720ff36..b680364f1e2c7 100644 --- a/frontend/rust-lib/flowy-database2/src/event_handler.rs +++ b/frontend/rust-lib/flowy-database2/src/event_handler.rs @@ -5,7 +5,7 @@ use collab_database::rows::RowId; use tokio::sync::oneshot; use flowy_error::{FlowyError, FlowyResult}; -use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult}; +use lib_dispatch::prelude::{af_spawn, data_result_ok, AFPluginData, AFPluginState, DataResult}; use lib_infra::util::timestamp; use crate::entities::*; @@ -697,7 +697,7 @@ pub(crate) async fn update_group_handler( let database_editor = manager.get_database_with_view_id(&view_id).await?; let group_changeset = GroupChangeset::from(params); let (tx, rx) = oneshot::channel(); - tokio::spawn(async move { + af_spawn(async move { let result = database_editor .update_group(&view_id, vec![group_changeset].into()) .await; diff --git a/frontend/rust-lib/flowy-database2/src/manager.rs b/frontend/rust-lib/flowy-database2/src/manager.rs index 10734ba8a50a9..ec5d1c28092f6 100644 --- a/frontend/rust-lib/flowy-database2/src/manager.rs +++ b/frontend/rust-lib/flowy-database2/src/manager.rs @@ -20,6 +20,7 @@ use collab_integrate::{CollabPersistenceConfig, RocksCollabDB}; use flowy_database_deps::cloud::DatabaseCloudService; use flowy_error::{internal_error, FlowyError, FlowyResult}; use flowy_task::TaskDispatcher; +use lib_dispatch::prelude::af_spawn; use crate::entities::{ DatabaseDescriptionPB, DatabaseLayoutPB, DatabaseSnapshotPB, DidFetchRowPB, @@ -361,7 +362,7 @@ impl DatabaseManager { /// Send notification to all clients that are listening to the given object. fn subscribe_block_event(workspace_database: &WorkspaceDatabase) { let mut block_event_rx = workspace_database.subscribe_block_event(); - tokio::spawn(async move { + af_spawn(async move { while let Ok(event) = block_event_rx.recv().await { match event { BlockEvent::DidFetchRow(row_details) => { diff --git a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs index e5343693de23e..767542d407a0d 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs @@ -12,6 +12,7 @@ use tracing::{event, warn}; use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult}; use flowy_task::TaskDispatcher; +use lib_dispatch::prelude::af_spawn; use lib_infra::future::{to_fut, Fut, FutureResult}; use crate::entities::*; @@ -56,7 +57,7 @@ impl DatabaseEditor { // Receive database sync state and send to frontend via the notification let mut sync_state = database.lock().subscribe_sync_state(); let cloned_database_id = database_id.clone(); - tokio::spawn(async move { + af_spawn(async move { while let Some(sync_state) = sync_state.next().await { send_notification( &cloned_database_id, @@ -69,7 +70,7 @@ impl DatabaseEditor { // Receive database snapshot state and send to frontend via the notification let mut snapshot_state = database.lock().subscribe_snapshot_state(); - tokio::spawn(async move { + af_spawn(async move { while let Some(snapshot_state) = snapshot_state.next().await { if let Some(new_snapshot_id) = snapshot_state.snapshot_id() { tracing::debug!( diff --git a/frontend/rust-lib/flowy-database2/src/services/database_view/view_editor.rs b/frontend/rust-lib/flowy-database2/src/services/database_view/view_editor.rs index 8dfc39416b189..276ef48273737 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database_view/view_editor.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database_view/view_editor.rs @@ -9,6 +9,7 @@ use collab_database::views::{DatabaseLayout, DatabaseView}; use tokio::sync::{broadcast, RwLock}; use flowy_error::{FlowyError, FlowyResult}; +use lib_dispatch::prelude::af_spawn; use crate::entities::{ CalendarEventPB, DatabaseLayoutMetaPB, DatabaseLayoutSettingPB, DeleteFilterParams, @@ -60,7 +61,7 @@ impl DatabaseViewEditor { cell_cache: CellCache, ) -> FlowyResult { let (notifier, _) = broadcast::channel(100); - tokio::spawn(DatabaseViewChangedReceiverRunner(Some(notifier.subscribe())).run()); + af_spawn(DatabaseViewChangedReceiverRunner(Some(notifier.subscribe())).run()); // Group let group_controller = Arc::new(RwLock::new( new_group_controller(view_id.clone(), delegate.clone()).await?, @@ -237,7 +238,7 @@ impl DatabaseViewEditor { let row_id = row_detail.row.id.clone(); let weak_filter_controller = Arc::downgrade(&self.filter_controller); let weak_sort_controller = Arc::downgrade(&self.sort_controller); - tokio::spawn(async move { + af_spawn(async move { if let Some(filter_controller) = weak_filter_controller.upgrade() { filter_controller .did_receive_row_changed(row_id.clone()) @@ -645,7 +646,7 @@ impl DatabaseViewEditor { let filter_type = UpdatedFilterType::new(Some(old), new); let filter_changeset = FilterChangeset::from_update(filter_type); let filter_controller = self.filter_controller.clone(); - tokio::spawn(async move { + af_spawn(async move { if let Some(notification) = filter_controller .did_receive_changes(filter_changeset) .await diff --git a/frontend/rust-lib/flowy-database2/src/services/group/configuration.rs b/frontend/rust-lib/flowy-database2/src/services/group/configuration.rs index f8f1aa8c21379..b71cdddf378b3 100644 --- a/frontend/rust-lib/flowy-database2/src/services/group/configuration.rs +++ b/frontend/rust-lib/flowy-database2/src/services/group/configuration.rs @@ -12,6 +12,7 @@ use serde::Serialize; use tracing::event; use flowy_error::{FlowyError, FlowyResult}; +use lib_dispatch::prelude::af_spawn; use lib_infra::future::Fut; use crate::entities::{GroupChangesPB, GroupPB, InsertedGroupPB}; @@ -415,7 +416,7 @@ where let configuration = (*self.setting).clone(); let writer = self.writer.clone(); let view_id = self.view_id.clone(); - tokio::spawn(async move { + af_spawn(async move { match writer.save_configuration(&view_id, configuration).await { Ok(_) => {}, Err(e) => { diff --git a/frontend/rust-lib/flowy-database2/tests/database/database_editor.rs b/frontend/rust-lib/flowy-database2/tests/database/database_editor.rs index 852b7ae1e8567..0a52c1538e1b8 100644 --- a/frontend/rust-lib/flowy-database2/tests/database/database_editor.rs +++ b/frontend/rust-lib/flowy-database2/tests/database/database_editor.rs @@ -37,7 +37,7 @@ pub struct DatabaseEditorTest { impl DatabaseEditorTest { pub async fn new_grid() -> Self { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let _ = sdk.init_anon_user().await; let params = make_test_grid(); @@ -46,7 +46,7 @@ impl DatabaseEditorTest { } pub async fn new_no_date_grid() -> Self { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let _ = sdk.init_anon_user().await; let params = make_no_date_test_grid(); @@ -55,7 +55,7 @@ impl DatabaseEditorTest { } pub async fn new_board() -> Self { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let _ = sdk.init_anon_user().await; let params = make_test_board(); @@ -64,7 +64,7 @@ impl DatabaseEditorTest { } pub async fn new_calendar() -> Self { - let sdk = EventIntegrationTest::new(); + let sdk = EventIntegrationTest::new().await; let _ = sdk.init_anon_user().await; let params = make_test_calendar(); diff --git a/frontend/rust-lib/flowy-database2/tests/database/filter_test/script.rs b/frontend/rust-lib/flowy-database2/tests/database/filter_test/script.rs index de07820b6c43b..423761d17652a 100644 --- a/frontend/rust-lib/flowy-database2/tests/database/filter_test/script.rs +++ b/frontend/rust-lib/flowy-database2/tests/database/filter_test/script.rs @@ -14,6 +14,7 @@ use flowy_database2::entities::{CheckboxFilterConditionPB, CheckboxFilterPB, Che use flowy_database2::services::database_view::DatabaseViewChanged; use flowy_database2::services::field::SelectOption; use flowy_database2::services::filter::FilterType; +use lib_dispatch::prelude::af_spawn; use crate::database::database_editor::DatabaseEditorTest; @@ -278,7 +279,7 @@ impl DatabaseFilterTest { if change.is_none() {return;} let change = change.unwrap(); let mut receiver = self.recv.take().unwrap(); - tokio::spawn(async move { + af_spawn(async move { match tokio::time::timeout(Duration::from_secs(2), receiver.recv()).await { Ok(changed) => { match changed.unwrap() { DatabaseViewChanged::FilterNotification(notification) => { diff --git a/frontend/rust-lib/flowy-document2/src/document.rs b/frontend/rust-lib/flowy-document2/src/document.rs index bfa90980a7424..2f56b28bdee64 100644 --- a/frontend/rust-lib/flowy-document2/src/document.rs +++ b/frontend/rust-lib/flowy-document2/src/document.rs @@ -9,6 +9,7 @@ use futures::StreamExt; use parking_lot::Mutex; use flowy_error::FlowyResult; +use lib_dispatch::prelude::af_spawn; use crate::entities::{DocEventPB, DocumentSnapshotStatePB, DocumentSyncStatePB}; use crate::notification::{send_notification, DocumentNotification}; @@ -61,7 +62,7 @@ fn subscribe_document_changed(doc_id: &str, document: &MutexDocument) { fn subscribe_document_snapshot_state(collab: &Arc) { let document_id = collab.lock().object_id.clone(); let mut snapshot_state = collab.lock().subscribe_snapshot_state(); - tokio::spawn(async move { + af_spawn(async move { while let Some(snapshot_state) = snapshot_state.next().await { if let Some(new_snapshot_id) = snapshot_state.snapshot_id() { tracing::debug!("Did create document remote snapshot: {}", new_snapshot_id); @@ -79,7 +80,7 @@ fn subscribe_document_snapshot_state(collab: &Arc) { fn subscribe_document_sync_state(collab: &Arc) { let document_id = collab.lock().object_id.clone(); let mut sync_state_stream = collab.lock().subscribe_sync_state(); - tokio::spawn(async move { + af_spawn(async move { while let Some(sync_state) = sync_state_stream.next().await { send_notification( &document_id, diff --git a/frontend/rust-lib/flowy-folder2/src/manager.rs b/frontend/rust-lib/flowy-folder2/src/manager.rs index b15ce1b2c56c6..7a2b37c8c224e 100644 --- a/frontend/rust-lib/flowy-folder2/src/manager.rs +++ b/frontend/rust-lib/flowy-folder2/src/manager.rs @@ -18,6 +18,7 @@ use collab_integrate::collab_builder::AppFlowyCollabBuilder; use collab_integrate::{CollabPersistenceConfig, RocksCollabDB, YrsDocAction}; use flowy_error::{ErrorCode, FlowyError, FlowyResult}; use flowy_folder_deps::cloud::{gen_view_id, FolderCloudService}; +use lib_dispatch::prelude::af_spawn; use crate::entities::icon::UpdateViewIconParams; use crate::entities::{ @@ -1027,7 +1028,7 @@ fn subscribe_folder_view_changed( weak_mutex_folder: &Weak, ) { let weak_mutex_folder = weak_mutex_folder.clone(); - tokio::spawn(async move { + af_spawn(async move { while let Ok(value) = rx.recv().await { if let Some(folder) = weak_mutex_folder.upgrade() { tracing::trace!("Did receive view change: {:?}", value); @@ -1065,7 +1066,7 @@ fn subscribe_folder_snapshot_state_changed( weak_mutex_folder: &Weak, ) { let weak_mutex_folder = weak_mutex_folder.clone(); - tokio::spawn(async move { + af_spawn(async move { if let Some(mutex_folder) = weak_mutex_folder.upgrade() { let stream = mutex_folder .lock() @@ -1093,7 +1094,7 @@ fn subscribe_folder_sync_state_changed( mut folder_sync_state_rx: WatchStream, _weak_mutex_folder: &Weak, ) { - tokio::spawn(async move { + af_spawn(async move { while let Some(state) = folder_sync_state_rx.next().await { send_notification(&workspace_id, FolderNotification::DidUpdateFolderSyncUpdate) .payload(FolderSyncStatePB::from(state)) @@ -1108,7 +1109,7 @@ fn subscribe_folder_trash_changed( weak_mutex_folder: &Weak, ) { let weak_mutex_folder = weak_mutex_folder.clone(); - tokio::spawn(async move { + af_spawn(async move { while let Ok(value) = rx.recv().await { if let Some(folder) = weak_mutex_folder.upgrade() { let mut unique_ids = HashSet::new(); diff --git a/frontend/rust-lib/flowy-server/Cargo.toml b/frontend/rust-lib/flowy-server/Cargo.toml index ccdc0a7b5681f..cfa3007ef057c 100644 --- a/frontend/rust-lib/flowy-server/Cargo.toml +++ b/frontend/rust-lib/flowy-server/Cargo.toml @@ -44,6 +44,7 @@ url = "2.4" tokio-util = "0.7" tokio-stream = { version = "0.1.14", features = ["sync"] } client-api = { version = "0.1.0", features = ["collab-sync"] } +lib-dispatch = { workspace = true } [dev-dependencies] uuid = { version = "1.3.3", features = ["v4"] } diff --git a/frontend/rust-lib/flowy-server/src/af_cloud/server.rs b/frontend/rust-lib/flowy-server/src/af_cloud/server.rs index f2257586b8289..09542ee9c190b 100644 --- a/frontend/rust-lib/flowy-server/src/af_cloud/server.rs +++ b/frontend/rust-lib/flowy-server/src/af_cloud/server.rs @@ -19,6 +19,7 @@ use flowy_server_config::af_cloud_config::AFCloudConfiguration; use flowy_storage::FileStorageService; use flowy_user_deps::cloud::UserCloudService; use flowy_user_deps::entities::UserTokenState; +use lib_dispatch::prelude::af_spawn; use lib_infra::future::FutureResult; use crate::af_cloud::impls::{ @@ -94,7 +95,7 @@ impl AppFlowyServer for AFCloudServer { let mut token_state_rx = self.client.subscribe_token_state(); let (watch_tx, watch_rx) = watch::channel(UserTokenState::Invalid); let weak_client = Arc::downgrade(&self.client); - tokio::spawn(async move { + af_spawn(async move { while let Ok(token_state) = token_state_rx.recv().await { if let Some(client) = weak_client.upgrade() { match token_state { @@ -185,7 +186,7 @@ fn spawn_ws_conn( let weak_api_client = Arc::downgrade(api_client); let enable_sync = enable_sync.clone(); - tokio::spawn(async move { + af_spawn(async move { if let Some(ws_client) = weak_ws_client.upgrade() { let mut state_recv = ws_client.subscribe_connect_state(); while let Ok(state) = state_recv.recv().await { @@ -215,7 +216,7 @@ fn spawn_ws_conn( let weak_device_id = Arc::downgrade(device_id); let weak_ws_client = Arc::downgrade(ws_client); let weak_api_client = Arc::downgrade(api_client); - tokio::spawn(async move { + af_spawn(async move { while let Ok(token_state) = token_state_rx.recv().await { match token_state { TokenState::Refresh => { diff --git a/frontend/rust-lib/flowy-server/src/supabase/api/database.rs b/frontend/rust-lib/flowy-server/src/supabase/api/database.rs index 963978c2fca28..8b38ad62fda9d 100644 --- a/frontend/rust-lib/flowy-server/src/supabase/api/database.rs +++ b/frontend/rust-lib/flowy-server/src/supabase/api/database.rs @@ -5,6 +5,7 @@ use tokio::sync::oneshot::channel; use flowy_database_deps::cloud::{ CollabObjectUpdate, CollabObjectUpdateByOid, DatabaseCloudService, DatabaseSnapshot, }; +use lib_dispatch::prelude::af_spawn; use lib_infra::future::FutureResult; use crate::supabase::api::request::{ @@ -35,7 +36,7 @@ where let try_get_postgrest = self.server.try_get_weak_postgrest(); let object_id = object_id.to_string(); let (tx, rx) = channel(); - tokio::spawn(async move { + af_spawn(async move { tx.send( async move { let postgrest = try_get_postgrest?; @@ -58,7 +59,7 @@ where ) -> FutureResult { let try_get_postgrest = self.server.try_get_weak_postgrest(); let (tx, rx) = channel(); - tokio::spawn(async move { + af_spawn(async move { tx.send( async move { let postgrest = try_get_postgrest?; diff --git a/frontend/rust-lib/flowy-server/src/supabase/api/document.rs b/frontend/rust-lib/flowy-server/src/supabase/api/document.rs index 9968a1c44b2d4..baa140d9cc942 100644 --- a/frontend/rust-lib/flowy-server/src/supabase/api/document.rs +++ b/frontend/rust-lib/flowy-server/src/supabase/api/document.rs @@ -7,6 +7,7 @@ use tokio::sync::oneshot::channel; use flowy_document_deps::cloud::{DocumentCloudService, DocumentSnapshot}; use flowy_error::FlowyError; +use lib_dispatch::prelude::af_spawn; use lib_infra::future::FutureResult; use crate::supabase::api::request::{get_snapshots_from_server, FetchObjectUpdateAction}; @@ -35,7 +36,7 @@ where let try_get_postgrest = self.server.try_get_weak_postgrest(); let document_id = document_id.to_string(); let (tx, rx) = channel(); - tokio::spawn(async move { + af_spawn(async move { tx.send( async move { let postgrest = try_get_postgrest?; @@ -85,7 +86,7 @@ where let try_get_postgrest = self.server.try_get_weak_postgrest(); let document_id = document_id.to_string(); let (tx, rx) = channel(); - tokio::spawn(async move { + af_spawn(async move { tx.send( async move { let postgrest = try_get_postgrest?; diff --git a/frontend/rust-lib/flowy-server/src/supabase/api/folder.rs b/frontend/rust-lib/flowy-server/src/supabase/api/folder.rs index a8147792c31c6..c3e15678c164e 100644 --- a/frontend/rust-lib/flowy-server/src/supabase/api/folder.rs +++ b/frontend/rust-lib/flowy-server/src/supabase/api/folder.rs @@ -10,6 +10,7 @@ use tokio::sync::oneshot::channel; use flowy_folder_deps::cloud::{ gen_workspace_id, Folder, FolderCloudService, FolderData, FolderSnapshot, Workspace, }; +use lib_dispatch::prelude::af_spawn; use lib_infra::future::FutureResult; use crate::response::ExtendedResponse; @@ -116,7 +117,7 @@ where let try_get_postgrest = self.server.try_get_weak_postgrest(); let workspace_id = workspace_id.to_string(); let (tx, rx) = channel(); - tokio::spawn(async move { + af_spawn(async move { tx.send( async move { let postgrest = try_get_postgrest?; diff --git a/frontend/rust-lib/flowy-server/src/supabase/api/user.rs b/frontend/rust-lib/flowy-server/src/supabase/api/user.rs index a08f6f3ef0e79..3f446bfcf8b6f 100644 --- a/frontend/rust-lib/flowy-server/src/supabase/api/user.rs +++ b/frontend/rust-lib/flowy-server/src/supabase/api/user.rs @@ -21,6 +21,7 @@ use flowy_folder_deps::cloud::{Folder, Workspace}; use flowy_user_deps::cloud::*; use flowy_user_deps::entities::*; use flowy_user_deps::DEFAULT_USER_NAME; +use lib_dispatch::prelude::af_spawn; use lib_infra::box_any::BoxAny; use lib_infra::future::FutureResult; use lib_infra::util::timestamp; @@ -238,7 +239,7 @@ where let try_get_postgrest = self.server.try_get_weak_postgrest(); let awareness_id = uid.to_string(); let (tx, rx) = channel(); - tokio::spawn(async move { + af_spawn(async move { tx.send( async move { let postgrest = try_get_postgrest?; @@ -278,7 +279,7 @@ where let try_get_postgrest = self.server.try_get_weak_postgrest(); let (tx, rx) = channel(); let init_update = empty_workspace_update(&collab_object); - tokio::spawn(async move { + af_spawn(async move { tx.send( async move { let postgrest = try_get_postgrest? @@ -316,7 +317,7 @@ where let try_get_postgrest = self.server.try_get_weak_postgrest(); let cloned_collab_object = collab_object.clone(); let (tx, rx) = channel(); - tokio::spawn(async move { + af_spawn(async move { tx.send( async move { CreateCollabAction::new(cloned_collab_object, try_get_postgrest?, update) diff --git a/frontend/rust-lib/flowy-user/src/event_handler.rs b/frontend/rust-lib/flowy-user/src/event_handler.rs index e83320b219f67..35bc71bbdc2d5 100644 --- a/frontend/rust-lib/flowy-user/src/event_handler.rs +++ b/frontend/rust-lib/flowy-user/src/event_handler.rs @@ -93,7 +93,7 @@ pub async fn get_user_profile_handler( let cloned_user_profile = user_profile.clone(); // Refresh the user profile in the background - tokio::spawn(async move { + af_spawn(async move { if let Some(manager) = weak_manager.upgrade() { let _ = manager.refresh_user_profile(&cloned_user_profile).await; } diff --git a/frontend/rust-lib/flowy-user/src/manager.rs b/frontend/rust-lib/flowy-user/src/manager.rs index 4fb483e6e9a99..be9ae8a992694 100644 --- a/frontend/rust-lib/flowy-user/src/manager.rs +++ b/frontend/rust-lib/flowy-user/src/manager.rs @@ -16,6 +16,7 @@ use flowy_sqlite::ConnectionPool; use flowy_sqlite::{query_dsl::*, DBConnection, ExpressionMethods}; use flowy_user_deps::cloud::UserUpdate; use flowy_user_deps::entities::*; +use lib_dispatch::prelude::af_spawn; use lib_infra::box_any::BoxAny; use crate::entities::{AuthStateChangedPB, AuthStatePB, UserProfilePB, UserSettingPB}; @@ -93,7 +94,7 @@ impl UserManager { let weak_user_manager = Arc::downgrade(&user_manager); if let Ok(user_service) = user_manager.cloud_services.get_user_service() { if let Some(mut rx) = user_service.subscribe_user_update() { - tokio::spawn(async move { + af_spawn(async move { while let Ok(update) = rx.recv().await { if let Some(user_manager) = weak_user_manager.upgrade() { if let Err(err) = user_manager.handler_user_update(update).await { @@ -133,7 +134,7 @@ impl UserManager { // Subscribe the token state let weak_pool = Arc::downgrade(&self.db_pool(user.uid)?); if let Some(mut token_state_rx) = self.cloud_services.subscribe_token_state() { - tokio::spawn(async move { + af_spawn(async move { while let Some(token_state) = token_state_rx.next().await { match token_state { UserTokenState::Refresh { token } => { @@ -401,7 +402,7 @@ impl UserManager { self.set_session(None)?; let server = self.cloud_services.get_user_service()?; - tokio::spawn(async move { + af_spawn(async move { if let Err(err) = server.sign_out(None).await { event!(tracing::Level::ERROR, "{:?}", err); } @@ -536,7 +537,7 @@ impl UserManager { params: UpdateUserProfileParams, ) -> Result<(), FlowyError> { let server = self.cloud_services.get_user_service()?; - tokio::spawn(async move { + af_spawn(async move { let credentials = UserCredentials::new(Some(token), Some(uid), None); server.update_user(credentials, params).await }) diff --git a/frontend/rust-lib/flowy-user/src/services/user_workspace.rs b/frontend/rust-lib/flowy-user/src/services/user_workspace.rs index 6002556d9cad8..4112c5438e19b 100644 --- a/frontend/rust-lib/flowy-user/src/services/user_workspace.rs +++ b/frontend/rust-lib/flowy-user/src/services/user_workspace.rs @@ -7,6 +7,7 @@ use flowy_error::{FlowyError, FlowyResult}; use flowy_sqlite::schema::user_workspace_table; use flowy_sqlite::{query_dsl::*, ConnectionPool, ExpressionMethods}; use flowy_user_deps::entities::{Role, UserWorkspace, WorkspaceMember}; +use lib_dispatch::prelude::af_spawn; use crate::entities::{RepeatedUserWorkspacePB, ResetWorkspacePB}; use crate::manager::UserManager; @@ -99,7 +100,7 @@ impl UserManager { if let Ok(service) = self.cloud_services.get_user_service() { if let Ok(pool) = self.db_pool(uid) { - tokio::spawn(async move { + af_spawn(async move { if let Ok(new_user_workspaces) = service.get_all_user_workspaces(uid).await { let _ = save_user_workspaces(uid, pool, &new_user_workspaces); let repeated_workspace_pbs = RepeatedUserWorkspacePB::from(new_user_workspaces); diff --git a/frontend/rust-lib/lib-dispatch/Cargo.toml b/frontend/rust-lib/lib-dispatch/Cargo.toml index 99d22d7448a24..0ae9fa340ef89 100644 --- a/frontend/rust-lib/lib-dispatch/Cargo.toml +++ b/frontend/rust-lib/lib-dispatch/Cargo.toml @@ -22,17 +22,18 @@ serde_json = {version = "1.0", optional = true } serde = { version = "1.0", features = ["derive"], optional = true } serde_repr = { version = "0.1", optional = true } validator = "0.16.1" +tracing = { version = "0.1"} #optional crate bincode = { version = "1.3", optional = true} protobuf = {version = "2.28.0", optional = true} -tracing = { version = "0.1"} [dev-dependencies] tokio = { version = "1.26", features = ["full"] } futures-util = "0.3.26" [features] -default = ["use_protobuf"] +default = ["use_protobuf", ] use_serde = ["bincode", "serde_json", "serde", "serde_repr"] use_protobuf= ["protobuf"] +single_thread = [] diff --git a/frontend/rust-lib/lib-dispatch/src/byte_trait.rs b/frontend/rust-lib/lib-dispatch/src/byte_trait.rs index 4548fd3cab6ed..6e8996e824e67 100644 --- a/frontend/rust-lib/lib-dispatch/src/byte_trait.rs +++ b/frontend/rust-lib/lib-dispatch/src/byte_trait.rs @@ -1,6 +1,7 @@ -use crate::errors::{DispatchError, InternalError}; use bytes::Bytes; +use crate::errors::{DispatchError, InternalError}; + // To bytes pub trait ToBytes { fn into_bytes(self) -> Result; @@ -26,21 +27,6 @@ where } } -// #[cfg(feature = "use_serde")] -// impl ToBytes for T -// where -// T: serde::Serialize, -// { -// fn into_bytes(self) -> Result { -// match serde_json::to_string(&self.0) { -// Ok(s) => Ok(Bytes::from(s)), -// Err(e) => Err(InternalError::SerializeToBytes(format!("{:?}", e)).into()), -// } -// } -// } - -// From bytes - pub trait AFPluginFromBytes: Sized { fn parse_from_bytes(bytes: Bytes) -> Result; } diff --git a/frontend/rust-lib/lib-dispatch/src/dispatcher.rs b/frontend/rust-lib/lib-dispatch/src/dispatcher.rs index dfd0d1dcc62da..a7a05c7b92e75 100644 --- a/frontend/rust-lib/lib-dispatch/src/dispatcher.rs +++ b/frontend/rust-lib/lib-dispatch/src/dispatcher.rs @@ -1,3 +1,13 @@ +use std::any::Any; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{future::Future, sync::Arc}; + +use derivative::*; +use pin_project::pin_project; +use tracing::event; + +use crate::module::AFPluginStateMap; use crate::runtime::AFPluginRuntime; use crate::{ errors::{DispatchError, Error, InternalError}, @@ -5,20 +15,76 @@ use crate::{ response::AFPluginEventResponse, service::{AFPluginServiceFactory, Service}, }; -use derivative::*; -use futures_core::future::BoxFuture; -use futures_util::task::Context; -use pin_project::pin_project; -use std::{future::Future, sync::Arc}; -use tokio::macros::support::{Pin, Poll}; + +#[cfg(feature = "single_thread")] +pub trait AFConcurrent {} + +#[cfg(feature = "single_thread")] +impl AFConcurrent for T where T: ?Sized {} + +#[cfg(not(feature = "single_thread"))] +pub trait AFConcurrent: Send + Sync {} + +#[cfg(not(feature = "single_thread"))] +impl AFConcurrent for T where T: Send + Sync {} + +#[cfg(feature = "single_thread")] +pub type AFBoxFuture<'a, T> = futures_core::future::LocalBoxFuture<'a, T>; + +#[cfg(not(feature = "single_thread"))] +pub type AFBoxFuture<'a, T> = futures_core::future::BoxFuture<'a, T>; + +pub type AFStateMap = std::sync::Arc; + +#[cfg(feature = "single_thread")] +pub(crate) fn downcast_owned(boxed: AFBox) -> Option { + boxed.downcast().ok().map(|boxed| *boxed) +} + +#[cfg(not(feature = "single_thread"))] +pub(crate) fn downcast_owned(boxed: AFBox) -> Option { + boxed.downcast().ok().map(|boxed| *boxed) +} + +#[cfg(feature = "single_thread")] +pub(crate) type AFBox = Box; + +#[cfg(not(feature = "single_thread"))] +pub(crate) type AFBox = Box; + +#[cfg(feature = "single_thread")] +pub type BoxFutureCallback = + Box AFBoxFuture<'static, ()> + 'static>; + +#[cfg(not(feature = "single_thread"))] +pub type BoxFutureCallback = + Box AFBoxFuture<'static, ()> + Send + Sync + 'static>; + +#[cfg(feature = "single_thread")] +pub fn af_spawn(future: T) -> tokio::task::JoinHandle +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + tokio::spawn(future) +} + +#[cfg(not(feature = "single_thread"))] +pub fn af_spawn(future: T) -> tokio::task::JoinHandle +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + tokio::spawn(future) +} pub struct AFPluginDispatcher { plugins: AFPluginMap, - runtime: AFPluginRuntime, + runtime: Arc, } impl AFPluginDispatcher { - pub fn construct(runtime: AFPluginRuntime, module_factory: F) -> AFPluginDispatcher + pub fn construct(runtime: Arc, module_factory: F) -> AFPluginDispatcher where F: FnOnce() -> Vec, { @@ -30,24 +96,77 @@ impl AFPluginDispatcher { } } - pub fn async_send( + pub async fn async_send( + dispatch: Arc, + request: Req, + ) -> AFPluginEventResponse + where + Req: Into, + { + AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await + } + + pub async fn async_send_with_callback( + dispatch: Arc, + request: Req, + callback: Callback, + ) -> AFPluginEventResponse + where + Req: Into, + Callback: FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + AFConcurrent + 'static, + { + let request: AFPluginRequest = request.into(); + let plugins = dispatch.plugins.clone(); + let service = Box::new(DispatchService { plugins }); + tracing::trace!("Async event: {:?}", &request.event); + let service_ctx = DispatchContext { + request, + callback: Some(Box::new(callback)), + }; + + // Spawns a future onto the runtime. + // + // This spawns the given future onto the runtime's executor, usually a + // thread pool. The thread pool is then responsible for polling the future + // until it completes. + // + // The provided future will start running in the background immediately + // when `spawn` is called, even if you don't await the returned + // `JoinHandle`. + let handle = dispatch.runtime.spawn(async move { + service.call(service_ctx).await.unwrap_or_else(|e| { + tracing::error!("Dispatch runtime error: {:?}", e); + InternalError::Other(format!("{:?}", e)).as_response() + }) + }); + + let result = dispatch.runtime.run_until(handle).await; + result.unwrap_or_else(|e| { + let msg = format!("EVENT_DISPATCH join error: {:?}", e); + tracing::error!("{}", msg); + let error = InternalError::JoinError(msg); + error.as_response() + }) + } + + pub fn box_async_send( dispatch: Arc, request: Req, ) -> DispatchFuture where - Req: std::convert::Into, + Req: Into + 'static, { - AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})) + AFPluginDispatcher::boxed_async_send_with_callback(dispatch, request, |_| Box::pin(async {})) } - pub fn async_send_with_callback( + pub fn boxed_async_send_with_callback( dispatch: Arc, request: Req, callback: Callback, ) -> DispatchFuture where - Req: std::convert::Into, - Callback: FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync, + Req: Into + 'static, + Callback: FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + AFConcurrent + 'static, { let request: AFPluginRequest = request.into(); let plugins = dispatch.plugins.clone(); @@ -57,7 +176,17 @@ impl AFPluginDispatcher { request, callback: Some(Box::new(callback)), }; - let join_handle = dispatch.runtime.spawn(async move { + + // Spawns a future onto the runtime. + // + // This spawns the given future onto the runtime's executor, usually a + // thread pool. The thread pool is then responsible for polling the future + // until it completes. + // + // The provided future will start running in the background immediately + // when `spawn` is called, even if you don't await the returned + // `JoinHandle`. + let handle = dispatch.runtime.spawn(async move { service.call(service_ctx).await.unwrap_or_else(|e| { tracing::error!("Dispatch runtime error: {:?}", e); InternalError::Other(format!("{:?}", e)).as_response() @@ -66,7 +195,8 @@ impl AFPluginDispatcher { DispatchFuture { fut: Box::pin(async move { - join_handle.await.unwrap_or_else(|e| { + let result = dispatch.runtime.run_until(handle).await; + result.unwrap_or_else(|e| { let msg = format!("EVENT_DISPATCH join error: {:?}", e); tracing::error!("{}", msg); let error = InternalError::JoinError(msg); @@ -76,44 +206,56 @@ impl AFPluginDispatcher { } } + #[cfg(not(feature = "single_thread"))] pub fn sync_send( dispatch: Arc, request: AFPluginRequest, ) -> AFPluginEventResponse { - futures::executor::block_on(async { - AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await - }) + futures::executor::block_on(AFPluginDispatcher::async_send_with_callback( + dispatch, + request, + |_| Box::pin(async {}), + )) } - pub fn spawn(&self, f: F) + #[cfg(feature = "single_thread")] + #[track_caller] + pub fn spawn(&self, future: F) -> tokio::task::JoinHandle where - F: Future + Send + 'static, + F: Future + 'static, { - self.runtime.spawn(f); + self.runtime.spawn(future) } -} -#[pin_project] -pub struct DispatchFuture { - #[pin] - pub fut: Pin + Sync + Send>>, -} + #[cfg(not(feature = "single_thread"))] + #[track_caller] + pub fn spawn(&self, future: F) -> tokio::task::JoinHandle + where + F: Future + Send + 'static, + ::Output: Send + 'static, + { + self.runtime.spawn(future) + } -impl Future for DispatchFuture -where - T: Send + Sync, -{ - type Output = T; + #[cfg(feature = "single_thread")] + pub async fn run_until(&self, future: F) -> F::Output + where + F: Future + 'static, + { + let handle = self.runtime.spawn(future); + self.runtime.run_until(handle).await.unwrap() + } - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - Poll::Ready(futures_core::ready!(this.fut.poll(cx))) + #[cfg(not(feature = "single_thread"))] + pub async fn run_until<'a, F>(&self, future: F) -> F::Output + where + F: Future + Send + 'a, + ::Output: Send + 'a, + { + self.runtime.run_until(future).await } } -pub type BoxFutureCallback = - Box BoxFuture<'static, ()> + 'static + Send + Sync>; - #[derive(Derivative)] #[derivative(Debug)] pub struct DispatchContext { @@ -136,36 +278,37 @@ pub(crate) struct DispatchService { impl Service for DispatchService { type Response = AFPluginEventResponse; type Error = DispatchError; - type Future = BoxFuture<'static, Result>; + type Future = AFBoxFuture<'static, Result>; - #[cfg_attr( - feature = "use_tracing", - tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx)) - )] + #[tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))] fn call(&self, ctx: DispatchContext) -> Self::Future { let module_map = self.plugins.clone(); let (request, callback) = ctx.into_parts(); Box::pin(async move { let result = { - // print_module_map_info(&module_map); match module_map.get(&request.event) { Some(module) => { - tracing::trace!("Handle event: {:?} by {:?}", &request.event, module.name); + event!( + tracing::Level::TRACE, + "Handle event: {:?} by {:?}", + &request.event, + module.name + ); let fut = module.new_service(()); let service_fut = fut.await?.call(request); service_fut.await }, None => { let msg = format!("Can not find the event handler. {:?}", request); - tracing::error!("{}", msg); + event!(tracing::Level::ERROR, "{}", msg); Err(InternalError::HandleNotFound(msg).into()) }, } }; let response = result.unwrap_or_else(|e| e.into()); - tracing::trace!("Dispatch result: {:?}", response); + event!(tracing::Level::TRACE, "Dispatch result: {:?}", response); if let Some(callback) = callback { callback(response.clone()).await; } @@ -190,3 +333,21 @@ fn print_plugins(plugins: &AFPluginMap) { tracing::info!("Event: {:?} plugin : {:?}", k, v.name); }) } + +#[pin_project] +pub struct DispatchFuture { + #[pin] + pub fut: Pin + 'static>>, +} + +impl Future for DispatchFuture +where + T: AFConcurrent + 'static, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().project(); + Poll::Ready(futures_core::ready!(this.fut.poll(cx))) + } +} diff --git a/frontend/rust-lib/lib-dispatch/src/errors/errors.rs b/frontend/rust-lib/lib-dispatch/src/errors/errors.rs index cbfd8542ecf68..19f0e336c1600 100644 --- a/frontend/rust-lib/lib-dispatch/src/errors/errors.rs +++ b/frontend/rust-lib/lib-dispatch/src/errors/errors.rs @@ -1,15 +1,17 @@ +use std::fmt; + +use bytes::Bytes; +use dyn_clone::DynClone; +use tokio::{sync::mpsc::error::SendError, task::JoinError}; + +use crate::prelude::AFConcurrent; use crate::{ byte_trait::AFPluginFromBytes, request::AFPluginEventRequest, response::{AFPluginEventResponse, ResponseBuilder}, }; -use bytes::Bytes; -use dyn_clone::DynClone; - -use std::fmt; -use tokio::{sync::mpsc::error::SendError, task::JoinError}; -pub trait Error: fmt::Debug + DynClone + Send + Sync { +pub trait Error: fmt::Debug + DynClone + AFConcurrent { fn as_response(&self) -> AFPluginEventResponse; } diff --git a/frontend/rust-lib/lib-dispatch/src/module/container.rs b/frontend/rust-lib/lib-dispatch/src/module/container.rs index a95b3b702ec53..d6fdf24d6795d 100644 --- a/frontend/rust-lib/lib-dispatch/src/module/container.rs +++ b/frontend/rust-lib/lib-dispatch/src/module/container.rs @@ -1,10 +1,9 @@ -use std::{ - any::{Any, TypeId}, - collections::HashMap, -}; +use std::{any::TypeId, collections::HashMap}; + +use crate::prelude::{downcast_owned, AFBox, AFConcurrent}; #[derive(Default, Debug)] -pub struct AFPluginStateMap(HashMap>); +pub struct AFPluginStateMap(HashMap); impl AFPluginStateMap { #[inline] @@ -14,7 +13,7 @@ impl AFPluginStateMap { pub fn insert(&mut self, val: T) -> Option where - T: 'static + Send + Sync, + T: 'static + AFConcurrent, { self .0 @@ -24,14 +23,14 @@ impl AFPluginStateMap { pub fn remove(&mut self) -> Option where - T: 'static + Send + Sync, + T: 'static + AFConcurrent, { self.0.remove(&TypeId::of::()).and_then(downcast_owned) } pub fn get(&self) -> Option<&T> where - T: 'static + Send + Sync, + T: 'static, { self .0 @@ -41,7 +40,7 @@ impl AFPluginStateMap { pub fn get_mut(&mut self) -> Option<&mut T> where - T: 'static + Send + Sync, + T: 'static + AFConcurrent, { self .0 @@ -51,7 +50,7 @@ impl AFPluginStateMap { pub fn contains(&self) -> bool where - T: 'static + Send + Sync, + T: 'static + AFConcurrent, { self.0.contains_key(&TypeId::of::()) } @@ -60,7 +59,3 @@ impl AFPluginStateMap { self.0.extend(other.0); } } - -fn downcast_owned(boxed: Box) -> Option { - boxed.downcast().ok().map(|boxed| *boxed) -} diff --git a/frontend/rust-lib/lib-dispatch/src/module/data.rs b/frontend/rust-lib/lib-dispatch/src/module/data.rs index 809954668893d..1d71299274622 100644 --- a/frontend/rust-lib/lib-dispatch/src/module/data.rs +++ b/frontend/rust-lib/lib-dispatch/src/module/data.rs @@ -1,15 +1,17 @@ +use std::{any::type_name, ops::Deref, sync::Arc}; + +use crate::prelude::AFConcurrent; use crate::{ errors::{DispatchError, InternalError}, request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest}, util::ready::{ready, Ready}, }; -use std::{any::type_name, ops::Deref, sync::Arc}; -pub struct AFPluginState(Arc); +pub struct AFPluginState(Arc); impl AFPluginState where - T: Send + Sync, + T: AFConcurrent, { pub fn new(data: T) -> Self { AFPluginState(Arc::new(data)) @@ -22,7 +24,7 @@ where impl Deref for AFPluginState where - T: ?Sized + Send + Sync, + T: ?Sized + AFConcurrent, { type Target = Arc; @@ -33,7 +35,7 @@ where impl Clone for AFPluginState where - T: ?Sized + Send + Sync, + T: ?Sized + AFConcurrent, { fn clone(&self) -> AFPluginState { AFPluginState(self.0.clone()) @@ -42,7 +44,7 @@ where impl From> for AFPluginState where - T: ?Sized + Send + Sync, + T: ?Sized + AFConcurrent, { fn from(arc: Arc) -> Self { AFPluginState(arc) @@ -51,7 +53,7 @@ where impl FromAFPluginRequest for AFPluginState where - T: ?Sized + Send + Sync + 'static, + T: ?Sized + AFConcurrent + 'static, { type Error = DispatchError; type Future = Ready>; @@ -59,7 +61,7 @@ where #[inline] fn from_request(req: &AFPluginEventRequest, _: &mut Payload) -> Self::Future { if let Some(state) = req.get_state::>() { - ready(Ok(state.clone())) + ready(Ok(state)) } else { let msg = format!( "Failed to get the plugin state of type: {}", diff --git a/frontend/rust-lib/lib-dispatch/src/module/mod.rs b/frontend/rust-lib/lib-dispatch/src/module/mod.rs index 7c8d1a344068a..9527a890b353b 100644 --- a/frontend/rust-lib/lib-dispatch/src/module/mod.rs +++ b/frontend/rust-lib/lib-dispatch/src/module/mod.rs @@ -1,4 +1,5 @@ #![allow(clippy::module_inception)] + pub use container::*; pub use data::*; pub use module::*; diff --git a/frontend/rust-lib/lib-dispatch/src/module/module.rs b/frontend/rust-lib/lib-dispatch/src/module/module.rs index 09c72fe245a16..0eb162b515269 100644 --- a/frontend/rust-lib/lib-dispatch/src/module/module.rs +++ b/frontend/rust-lib/lib-dispatch/src/module/module.rs @@ -9,15 +9,15 @@ use std::{ task::{Context, Poll}, }; -use futures_core::future::BoxFuture; use futures_core::ready; use nanoid::nanoid; use pin_project::pin_project; +use crate::dispatcher::AFConcurrent; +use crate::prelude::{AFBoxFuture, AFStateMap}; use crate::service::AFPluginHandler; use crate::{ errors::{DispatchError, InternalError}, - module::{container::AFPluginStateMap, AFPluginState}, request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest}, response::{AFPluginEventResponse, AFPluginResponder}, service::{ @@ -58,7 +58,7 @@ pub struct AFPlugin { pub name: String, /// a list of `AFPluginState` that the plugin registers. The state can be read by the plugin's handler. - states: Arc, + states: AFStateMap, /// Contains a list of factories that are used to generate the services used to handle the passed-in /// `ServiceRequest`. @@ -72,7 +72,7 @@ impl std::default::Default for AFPlugin { fn default() -> Self { Self { name: "".to_owned(), - states: Arc::new(AFPluginStateMap::new()), + states: Default::default(), event_service_factory: Arc::new(HashMap::new()), } } @@ -88,11 +88,10 @@ impl AFPlugin { self } - pub fn state(mut self, data: D) -> Self { + pub fn state(mut self, data: D) -> Self { Arc::get_mut(&mut self.states) .unwrap() - .insert(AFPluginState::new(data)); - + .insert(crate::module::AFPluginState::new(data)); self } @@ -100,9 +99,9 @@ impl AFPlugin { pub fn event(mut self, event: E, handler: H) -> Self where H: AFPluginHandler, - T: FromAFPluginRequest + 'static + Send + Sync, - ::Future: Sync + Send, - R: Future + 'static + Send + Sync, + T: FromAFPluginRequest + 'static + AFConcurrent, + ::Future: AFConcurrent, + R: Future + AFConcurrent + 'static, R::Output: AFPluginResponder + 'static, E: Eq + Hash + Debug + Clone + Display, { @@ -169,7 +168,7 @@ impl AFPluginServiceFactory for AFPlugin { type Error = DispatchError; type Service = BoxService; type Context = (); - type Future = BoxFuture<'static, Result>; + type Future = AFBoxFuture<'static, Result>; fn new_service(&self, _cfg: Self::Context) -> Self::Future { let services = self.event_service_factory.clone(); @@ -185,13 +184,14 @@ pub struct AFPluginService { services: Arc< HashMap>, >, - states: Arc, + states: AFStateMap, } impl Service for AFPluginService { type Response = AFPluginEventResponse; type Error = DispatchError; - type Future = BoxFuture<'static, Result>; + + type Future = AFBoxFuture<'static, Result>; fn call(&self, request: AFPluginRequest) -> Self::Future { let AFPluginRequest { id, event, payload } = request; @@ -224,7 +224,7 @@ impl Service for AFPluginService { #[pin_project] pub struct AFPluginServiceFuture { #[pin] - fut: BoxFuture<'static, Result>, + fut: AFBoxFuture<'static, Result>, } impl Future for AFPluginServiceFuture { diff --git a/frontend/rust-lib/lib-dispatch/src/request/payload.rs b/frontend/rust-lib/lib-dispatch/src/request/payload.rs index c371537f36748..6fc2a98e992bb 100644 --- a/frontend/rust-lib/lib-dispatch/src/request/payload.rs +++ b/frontend/rust-lib/lib-dispatch/src/request/payload.rs @@ -1,9 +1,7 @@ -use bytes::Bytes; use std::{fmt, fmt::Formatter}; -pub enum PayloadError {} +use bytes::Bytes; -// TODO: support stream data #[derive(Clone)] #[cfg_attr(feature = "use_serde", derive(serde::Serialize))] pub enum Payload { diff --git a/frontend/rust-lib/lib-dispatch/src/request/request.rs b/frontend/rust-lib/lib-dispatch/src/request/request.rs index 20af0bbc02c6d..45fb6a7d00293 100644 --- a/frontend/rust-lib/lib-dispatch/src/request/request.rs +++ b/frontend/rust-lib/lib-dispatch/src/request/request.rs @@ -1,19 +1,20 @@ use std::future::Future; +use std::{ + fmt::Debug, + pin::Pin, + task::{Context, Poll}, +}; + +use derivative::*; +use futures_core::ready; +use crate::prelude::{AFConcurrent, AFStateMap}; use crate::{ errors::{DispatchError, InternalError}, - module::{AFPluginEvent, AFPluginStateMap}, + module::AFPluginEvent, request::payload::Payload, util::ready::{ready, Ready}, }; -use derivative::*; -use futures_core::ready; -use std::{ - fmt::Debug, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; #[derive(Clone, Debug, Derivative)] pub struct AFPluginEventRequest { @@ -21,27 +22,27 @@ pub struct AFPluginEventRequest { pub(crate) id: String, pub(crate) event: AFPluginEvent, #[derivative(Debug = "ignore")] - pub(crate) states: Arc, + pub(crate) states: AFStateMap, } impl AFPluginEventRequest { - pub fn new(id: String, event: E, module_data: Arc) -> AFPluginEventRequest + pub fn new(id: String, event: E, states: AFStateMap) -> AFPluginEventRequest where E: Into, { Self { id, event: event.into(), - states: module_data, + states, } } - pub fn get_state(&self) -> Option<&T> + pub fn get_state(&self) -> Option where - T: Send + Sync, + T: AFConcurrent + 'static + Clone, { if let Some(data) = self.states.get::() { - return Some(data); + return Some(data.clone()); } None diff --git a/frontend/rust-lib/lib-dispatch/src/runtime.rs b/frontend/rust-lib/lib-dispatch/src/runtime.rs index 656612b359cb8..691c862250093 100644 --- a/frontend/rust-lib/lib-dispatch/src/runtime.rs +++ b/frontend/rust-lib/lib-dispatch/src/runtime.rs @@ -1,24 +1,117 @@ -use std::{io, thread}; +use std::fmt::{Display, Formatter}; +use std::future::Future; +use std::io; + use tokio::runtime; +use tokio::runtime::Runtime; +use tokio::task::JoinHandle; + +pub struct AFPluginRuntime { + inner: Runtime, + #[cfg(feature = "single_thread")] + local: tokio::task::LocalSet, +} + +impl Display for AFPluginRuntime { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if cfg!(feature = "single_thread") { + write!(f, "Runtime(single_thread)") + } else { + write!(f, "Runtime(multi_thread)") + } + } +} + +impl AFPluginRuntime { + pub fn new() -> io::Result { + let inner = default_tokio_runtime()?; + Ok(Self { + inner, + #[cfg(feature = "single_thread")] + local: tokio::task::LocalSet::new(), + }) + } + + #[cfg(feature = "single_thread")] + #[track_caller] + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + 'static, + { + self.local.spawn_local(future) + } + + #[cfg(not(feature = "single_thread"))] + #[track_caller] + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + ::Output: Send + 'static, + { + self.inner.spawn(future) + } -pub type AFPluginRuntime = tokio::runtime::Runtime; + #[cfg(feature = "single_thread")] + pub async fn run_until(&self, future: F) -> F::Output + where + F: Future, + { + self.local.run_until(future).await + } + + #[cfg(not(feature = "single_thread"))] + pub async fn run_until(&self, future: F) -> F::Output + where + F: Future, + { + future.await + } + + #[cfg(feature = "single_thread")] + #[track_caller] + pub fn block_on(&self, f: F) -> F::Output + where + F: Future, + { + self.local.block_on(&self.inner, f) + } + + #[cfg(not(feature = "single_thread"))] + #[track_caller] + pub fn block_on(&self, f: F) -> F::Output + where + F: Future, + { + self.inner.block_on(f) + } +} + +#[cfg(feature = "single_thread")] +pub fn default_tokio_runtime() -> io::Result { + runtime::Builder::new_current_thread() + .thread_name("dispatch-rt-st") + .enable_io() + .enable_time() + .build() +} -pub fn tokio_default_runtime() -> io::Result { +#[cfg(not(feature = "single_thread"))] +pub fn default_tokio_runtime() -> io::Result { runtime::Builder::new_multi_thread() - .thread_name("dispatch-rt") + .thread_name("dispatch-rt-mt") .enable_io() .enable_time() .on_thread_start(move || { tracing::trace!( "{:?} thread started: thread_id= {}", - thread::current(), + std::thread::current(), thread_id::get() ); }) .on_thread_stop(move || { tracing::trace!( "{:?} thread stopping: thread_id= {}", - thread::current(), + std::thread::current(), thread_id::get(), ); }) diff --git a/frontend/rust-lib/lib-dispatch/src/service/boxed.rs b/frontend/rust-lib/lib-dispatch/src/service/boxed.rs index 6d2a72e8439b1..76780e1d30fe5 100644 --- a/frontend/rust-lib/lib-dispatch/src/service/boxed.rs +++ b/frontend/rust-lib/lib-dispatch/src/service/boxed.rs @@ -1,21 +1,33 @@ +use crate::prelude::{AFBoxFuture, AFConcurrent}; use crate::service::{AFPluginServiceFactory, Service}; -use futures_core::future::BoxFuture; pub fn factory(factory: SF) -> BoxServiceFactory where - SF: AFPluginServiceFactory + 'static + Sync + Send, + SF: AFPluginServiceFactory + 'static + AFConcurrent, Req: 'static, SF::Response: 'static, SF::Service: 'static, SF::Future: 'static, - SF::Error: 'static + Send + Sync, - >::Service: Sync + Send, - <>::Service as Service>::Future: Send + Sync, - >::Future: Send + Sync, + SF::Error: 'static, + >::Service: AFConcurrent, + <>::Service as Service>::Future: AFConcurrent, + >::Future: AFConcurrent, { BoxServiceFactory(Box::new(FactoryWrapper(factory))) } +#[cfg(feature = "single_thread")] +type Inner = Box< + dyn AFPluginServiceFactory< + Req, + Context = Cfg, + Response = Res, + Error = Err, + Service = BoxService, + Future = AFBoxFuture<'static, Result, Err>>, + >, +>; +#[cfg(not(feature = "single_thread"))] type Inner = Box< dyn AFPluginServiceFactory< Req, @@ -23,9 +35,9 @@ type Inner = Box< Response = Res, Error = Err, Service = BoxService, - Future = BoxFuture<'static, Result, Err>>, - > + Sync - + Send, + Future = AFBoxFuture<'static, Result, Err>>, + > + Send + + Sync, >; pub struct BoxServiceFactory(Inner); @@ -39,15 +51,21 @@ where type Error = Err; type Service = BoxService; type Context = Cfg; - type Future = BoxFuture<'static, Result>; + type Future = AFBoxFuture<'static, Result>; fn new_service(&self, cfg: Cfg) -> Self::Future { self.0.new_service(cfg) } } +#[cfg(feature = "single_thread")] +pub type BoxService = Box< + dyn Service>>, +>; + +#[cfg(not(feature = "single_thread"))] pub type BoxService = Box< - dyn Service>> + dyn Service>> + Sync + Send, >; @@ -88,11 +106,11 @@ impl ServiceWrapper { impl Service for ServiceWrapper where S: Service, - S::Future: 'static + Send + Sync, + S::Future: 'static + AFConcurrent, { type Response = Res; type Error = Err; - type Future = BoxFuture<'static, Result>; + type Future = AFBoxFuture<'static, Result>; fn call(&self, req: Req) -> Self::Future { Box::pin(self.inner.call(req)) @@ -108,15 +126,15 @@ where Err: 'static, SF: AFPluginServiceFactory, SF::Future: 'static, - SF::Service: 'static + Send + Sync, - <>::Service as Service>::Future: Send + Sync + 'static, - >::Future: Send + Sync, + SF::Service: 'static + AFConcurrent, + <>::Service as Service>::Future: AFConcurrent + 'static, + >::Future: AFConcurrent, { type Response = Res; type Error = Err; type Service = BoxService; type Context = Cfg; - type Future = BoxFuture<'static, Result>; + type Future = AFBoxFuture<'static, Result>; fn new_service(&self, cfg: Cfg) -> Self::Future { let f = self.0.new_service(cfg); diff --git a/frontend/rust-lib/lib-dispatch/src/service/handler.rs b/frontend/rust-lib/lib-dispatch/src/service/handler.rs index c55d4d0b166e7..d231ed27f17fd 100644 --- a/frontend/rust-lib/lib-dispatch/src/service/handler.rs +++ b/frontend/rust-lib/lib-dispatch/src/service/handler.rs @@ -8,18 +8,19 @@ use std::{ use futures_core::ready; use pin_project::pin_project; +use crate::dispatcher::AFConcurrent; use crate::{ errors::DispatchError, - request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest}, + request::{AFPluginEventRequest, FromAFPluginRequest}, response::{AFPluginEventResponse, AFPluginResponder}, service::{AFPluginServiceFactory, Service, ServiceRequest, ServiceResponse}, util::ready::*, }; /// A closure that is run every time for the specified plugin event -pub trait AFPluginHandler: Clone + 'static + Sync + Send +pub trait AFPluginHandler: Clone + AFConcurrent + 'static where - R: Future + Send + Sync, + R: Future + AFConcurrent, R::Output: AFPluginResponder, { fn call(&self, param: T) -> R; @@ -29,7 +30,7 @@ pub struct AFPluginHandlerService where H: AFPluginHandler, T: FromAFPluginRequest, - R: Future + Sync + Send, + R: Future + AFConcurrent, R::Output: AFPluginResponder, { handler: H, @@ -40,7 +41,7 @@ impl AFPluginHandlerService where H: AFPluginHandler, T: FromAFPluginRequest, - R: Future + Sync + Send, + R: Future + AFConcurrent, R::Output: AFPluginResponder, { pub fn new(handler: H) -> Self { @@ -55,7 +56,7 @@ impl Clone for AFPluginHandlerService where H: AFPluginHandler, T: FromAFPluginRequest, - R: Future + Sync + Send, + R: Future + AFConcurrent, R::Output: AFPluginResponder, { fn clone(&self) -> Self { @@ -70,7 +71,7 @@ impl AFPluginServiceFactory for AFPluginHandlerService< where F: AFPluginHandler, T: FromAFPluginRequest, - R: Future + Send + Sync, + R: Future + AFConcurrent, R::Output: AFPluginResponder, { type Response = ServiceResponse; @@ -88,7 +89,7 @@ impl Service for AFPluginHandlerService where H: AFPluginHandler, T: FromAFPluginRequest, - R: Future + Sync + Send, + R: Future + AFConcurrent, R::Output: AFPluginResponder, { type Response = ServiceResponse; @@ -107,7 +108,7 @@ pub enum HandlerServiceFuture where H: AFPluginHandler, T: FromAFPluginRequest, - R: Future + Sync + Send, + R: Future + AFConcurrent, R::Output: AFPluginResponder, { Extract(#[pin] T::Future, Option, H), @@ -118,7 +119,7 @@ impl Future for HandlerServiceFuture where F: AFPluginHandler, T: FromAFPluginRequest, - R: Future + Sync + Send, + R: Future + AFConcurrent, R::Output: AFPluginResponder, { type Output = Result; @@ -154,8 +155,8 @@ where macro_rules! factory_tuple ({ $($param:ident)* } => { impl AFPluginHandler<($($param,)*), Res> for Func - where Func: Fn($($param),*) -> Res + Clone + 'static + Sync + Send, - Res: Future + Sync + Send, + where Func: Fn($($param),*) -> Res + Clone + 'static + AFConcurrent, + Res: Future + AFConcurrent, Res::Output: AFPluginResponder, { #[allow(non_snake_case)] @@ -181,7 +182,7 @@ macro_rules! tuple_from_req ({$tuple_type:ident, $(($n:tt, $T:ident)),+} => { type Error = DispatchError; type Future = $tuple_type<$($T),+>; - fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future { + fn from_request(req: &AFPluginEventRequest, payload: &mut crate::prelude::Payload) -> Self::Future { $tuple_type { items: <($(Option<$T>,)+)>::default(), futs: FromRequestFutures($($T::from_request(req, payload),)+), diff --git a/frontend/rust-lib/lib-dispatch/tests/api/module.rs b/frontend/rust-lib/lib-dispatch/tests/api/module.rs index cf68fa583b482..4e105a8257e6d 100644 --- a/frontend/rust-lib/lib-dispatch/tests/api/module.rs +++ b/frontend/rust-lib/lib-dispatch/tests/api/module.rs @@ -1,7 +1,8 @@ -use lib_dispatch::prelude::*; -use lib_dispatch::runtime::tokio_default_runtime; use std::sync::Arc; +use lib_dispatch::prelude::*; +use lib_dispatch::runtime::AFPluginRuntime; + pub async fn hello() -> String { "say hello".to_string() } @@ -9,7 +10,7 @@ pub async fn hello() -> String { #[tokio::test] async fn test() { let event = "1"; - let runtime = tokio_default_runtime().unwrap(); + let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let dispatch = Arc::new(AFPluginDispatcher::construct(runtime, || { vec![AFPlugin::new().event(event, hello)] })); diff --git a/frontend/rust-lib/lib-log/Cargo.toml b/frontend/rust-lib/lib-log/Cargo.toml index a02dcbed73cd3..483a4f403e21b 100644 --- a/frontend/rust-lib/lib-log/Cargo.toml +++ b/frontend/rust-lib/lib-log/Cargo.toml @@ -7,10 +7,10 @@ edition = "2018" [dependencies] -tracing-log = { version = "0.1.3"} -tracing-subscriber = { version = "0.2.25", features = ["registry", "env-filter", "ansi", "json"] } -tracing-bunyan-formatter = "0.2.6" -tracing-appender = "0.1" +tracing-log = { version = "0.2"} +tracing-subscriber = { version = "0.3.17", features = ["registry", "env-filter", "ansi", "json"] } +tracing-bunyan-formatter = "0.3.9" +tracing-appender = "0.2.2" tracing-core = "0.1" tracing = { version = "0.1", features = ["log"] } log = "0.4.17" diff --git a/frontend/rust-lib/lib-log/src/layer.rs b/frontend/rust-lib/lib-log/src/layer.rs index 870223c0cf0c0..b8db7aeb54ca0 100644 --- a/frontend/rust-lib/lib-log/src/layer.rs +++ b/frontend/rust-lib/lib-log/src/layer.rs @@ -4,7 +4,7 @@ use serde::ser::{SerializeMap, Serializer}; use serde_json::Value; use tracing::{Event, Id, Subscriber}; use tracing_bunyan_formatter::JsonStorage; -use tracing_core::{metadata::Level, span::Attributes}; +use tracing_core::metadata::Level; use tracing_subscriber::{fmt::MakeWriter, layer::Context, registry::SpanRef, Layer}; const LEVEL: &str = "level"; @@ -17,17 +17,22 @@ const LOG_TARGET_PATH: &str = "log.target"; const RESERVED_FIELDS: [&str; 3] = [LEVEL, TIME, MESSAGE]; const IGNORE_FIELDS: [&str; 2] = [LOG_MODULE_PATH, LOG_TARGET_PATH]; -pub struct FlowyFormattingLayer { +pub struct FlowyFormattingLayer<'a, W: MakeWriter<'static> + 'static> { make_writer: W, with_target: bool, + phantom: std::marker::PhantomData<&'a ()>, } -impl FlowyFormattingLayer { +impl<'a, W> FlowyFormattingLayer<'a, W> +where + W: for<'writer> MakeWriter<'writer> + 'static, +{ #[allow(dead_code)] pub fn new(make_writer: W) -> Self { Self { make_writer, with_target: false, + phantom: std::marker::PhantomData, } } @@ -43,9 +48,9 @@ impl FlowyFormattingLayer { Ok(()) } - fn serialize_span tracing_subscriber::registry::LookupSpan<'a>>( + fn serialize_span tracing_subscriber::registry::LookupSpan<'b>>( &self, - span: &SpanRef, + span: &SpanRef<'a, S>, ty: Type, ctx: &Context<'_, S>, ) -> Result, std::io::Error> { @@ -86,6 +91,7 @@ impl FlowyFormattingLayer { /// The type of record we are dealing with: entering a span, exiting a span, an /// event. +#[allow(dead_code)] #[derive(Clone, Debug)] pub enum Type { EnterSpan, @@ -104,8 +110,8 @@ impl fmt::Display for Type { } } -fn format_span_context tracing_subscriber::registry::LookupSpan<'a>>( - span: &SpanRef, +fn format_span_context<'b, S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>>( + span: &SpanRef<'b, S>, ty: Type, context: &Context<'_, S>, ) -> String { @@ -153,10 +159,10 @@ fn format_event_message tracing_subscriber::registry::Lo message } -impl Layer for FlowyFormattingLayer +impl Layer for FlowyFormattingLayer<'static, W> where S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, - W: MakeWriter + 'static, + W: for<'writer> MakeWriter<'writer> + 'static, { fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { // Events do not necessarily happen in the context of a span, hence @@ -221,13 +227,6 @@ where } } - fn new_span(&self, _attrs: &Attributes, id: &Id, ctx: Context<'_, S>) { - let span = ctx.span(id).expect("Span not found, this is a bug"); - if let Ok(serialized) = self.serialize_span(&span, Type::EnterSpan, &ctx) { - let _ = self.emit(serialized); - } - } - fn on_close(&self, id: Id, ctx: Context<'_, S>) { let span = ctx.span(&id).expect("Span not found, this is a bug"); if let Ok(serialized) = self.serialize_span(&span, Type::ExitSpan, &ctx) { diff --git a/frontend/rust-lib/lib-log/src/lib.rs b/frontend/rust-lib/lib-log/src/lib.rs index 86203e6378efd..69271ebec1dd6 100644 --- a/frontend/rust-lib/lib-log/src/lib.rs +++ b/frontend/rust-lib/lib-log/src/lib.rs @@ -1,16 +1,15 @@ use std::sync::RwLock; use lazy_static::lazy_static; -use log::LevelFilter; use tracing::subscriber::set_global_default; use tracing_appender::{non_blocking::WorkerGuard, rolling::RollingFileAppender}; use tracing_bunyan_formatter::JsonStorageLayer; -use tracing_log::LogTracer; use tracing_subscriber::{layer::SubscriberExt, EnvFilter}; use crate::layer::FlowyFormattingLayer; mod layer; + lazy_static! { static ref LOG_GUARD: RwLock> = RwLock::new(None); } @@ -47,48 +46,17 @@ impl Builder { .with_ansi(true) .with_target(true) .with_max_level(tracing::Level::TRACE) + .with_thread_ids(false) + .with_file(false) .with_writer(std::io::stderr) - .with_thread_ids(true) - .json() - .with_current_span(true) - .with_span_list(true) - .compact() + .pretty() + .with_env_filter(env_filter) .finish() - .with(env_filter) .with(JsonStorageLayer) - .with(FlowyFormattingLayer::new(std::io::stdout)) .with(FlowyFormattingLayer::new(non_blocking)); set_global_default(subscriber).map_err(|e| format!("{:?}", e))?; - LogTracer::builder() - .with_max_level(LevelFilter::Trace) - .init() - .map_err(|e| format!("{:?}", e))?; - *LOG_GUARD.write().unwrap() = Some(guard); Ok(()) } } - -#[cfg(test)] -mod tests { - use super::*; - - // run cargo test --features="use_bunyan" or cargo test - #[test] - fn test_log() { - Builder::new("flowy", ".") - .env_filter("debug") - .build() - .unwrap(); - tracing::info!("😁 tracing::info call"); - log::debug!("😁 log::debug call"); - - say("hello world"); - } - - #[tracing::instrument(level = "trace", name = "say")] - fn say(s: &str) { - tracing::info!("{}", s); - } -}