diff --git a/.github/workflows/ci_linux.yaml b/.github/workflows/ci_linux.yaml index 660d6933..e3faa5b4 100644 --- a/.github/workflows/ci_linux.yaml +++ b/.github/workflows/ci_linux.yaml @@ -18,7 +18,7 @@ jobs: build: strategy: matrix: - rust-version: [stable, 1.75] + rust-version: [stable, 1.81] runs-on: ubuntu-latest @@ -28,14 +28,6 @@ jobs: - name: Setup rust run: rustup default ${{ matrix.rust-version }} - # As new versions of our dependencies come out, they might depend on newer - # versions of the Rust compiler. When that happens, we'll use this step to - # lock down the dependency to a version that is known to be compatible with - # compiler version 1.75. - - name: Patch dependencies - if: ${{ matrix.rust-version == 1.75 }} - run: ./scripts/patch-versions-msrv-1_75.sh - - name: Build default features run: cargo build --workspace - name: Test default features diff --git a/Cargo.toml b/Cargo.toml index 8a05bd05..af477d15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bevy_impulse" -version = "0.0.2" +version = "0.3.0" edition = "2021" authors = ["Grey "] license = "Apache-2.0" @@ -17,18 +17,18 @@ categories = [ [dependencies] bevy_impulse_derive = { path = "macros", version = "0.0.2" } -bevy_ecs = "0.12" -bevy_utils = "0.12" -bevy_hierarchy = "0.12" -bevy_derive = "0.12" -bevy_app = "0.12" +bevy_ecs = "0.15" +bevy_utils = "0.15" +bevy_hierarchy = "0.15" +bevy_derive = "0.15" +bevy_app = "0.15" async-task = { version = "4.7.1", optional = true } # TODO(@mxgrey) We could probably remove bevy_tasks when the single_threaded_async # feature is active, but we'd have to refactor some internal usage of # bevy_tasks::Task, so we're leaving it as a mandatory dependency for now. -bevy_tasks = { version = "0.12", features = ["multi-threaded"] } +bevy_tasks = { version = "0.15", features = ["multi_threaded"] } itertools = "0.13" smallvec = "1.13" @@ -43,8 +43,8 @@ thiserror = "1.0" # the testing module for doctests, and doctests can only # make use of default features, so we're a bit stuck with # these for now. -bevy_core = "0.12" -bevy_time = "0.12" +bevy_core = "0.15" +bevy_time = "0.15" schemars = { version = "0.8.21", optional = true } serde = { version = "1.0.210", features = ["derive", "rc"], optional = true } @@ -55,7 +55,7 @@ strum = { version = "0.26.3", optional = true, features = ["derive"] } semver = { version = "1.0.24", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] -uuid = { version = "1.13.1", default-features = false, features = ["js"] } +uuid = { version = "1.12", default-features = false, features = ["js"] } [features] single_threaded_async = ["dep:async-task"] diff --git a/examples/diagram/calculator/Cargo.toml b/examples/diagram/calculator/Cargo.toml index 8e89632b..b0e08b58 100644 --- a/examples/diagram/calculator/Cargo.toml +++ b/examples/diagram/calculator/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "calculator" -version = "0.1.0" +version = "0.3.0" edition = "2021" [dependencies] -bevy_app = "0.12" -bevy_core = "0.12" -bevy_impulse = { version = "0.0.2", path = "../../..", features = ["diagram"] } -bevy_time = "0.12" +bevy_app = "0.15" +bevy_core = "0.15" +bevy_impulse = { version = "0.3.0", path = "../../..", features = ["diagram"] } +bevy_time = "0.15" clap = { version = "4.5.23", features = ["derive"] } serde_json = "1.0.128" tracing-subscriber = "0.3.19" diff --git a/examples/diagram/calculator/src/main.rs b/examples/diagram/calculator/src/main.rs index 2321f0cf..dc743d13 100644 --- a/examples/diagram/calculator/src/main.rs +++ b/examples/diagram/calculator/src/main.rs @@ -46,7 +46,7 @@ fn main() -> Result<(), Box> { let request = serde_json::Value::from_str(&args.request)?; let mut promise = - app.world + app.world_mut() .command(|cmds| -> Result, DiagramError> { let workflow = diagram.spawn_io_workflow(cmds, ®istry)?; Ok(cmds.request(request, workflow).take_response()) diff --git a/src/async_execution/single_threaded_execution.rs b/src/async_execution/single_threaded_execution.rs index ffd8ebc4..f2354f7c 100644 --- a/src/async_execution/single_threaded_execution.rs +++ b/src/async_execution/single_threaded_execution.rs @@ -18,7 +18,7 @@ use bevy_ecs::prelude::World; use async_task::Runnable; -pub(crate) use bevy_tasks::Task as TaskHandle; +pub(crate) use bevy_tasks::{Task as TaskHandle, TaskPool}; use tokio::sync::mpsc::{ unbounded_channel, UnboundedReceiver as TokioReceiver, UnboundedSender as TokioSender, }; @@ -99,12 +99,7 @@ impl SingleThreadedExecution { where T: Send + 'static, { - let sender = self.runnable_sender.clone(); - let (runnable, task) = async_task::spawn_local(future, move |runnable| { - sender.send(runnable).ok(); - }); - let _ = self.runnable_sender.send(runnable); - TaskHandle::new(task) + TaskPool::new().spawn_local(future) } pub(crate) fn cancel_sender(&self) -> SingleThreadedExecutionSender { diff --git a/src/buffer.rs b/src/buffer.rs index 1c962681..344e6b14 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -78,7 +78,7 @@ impl Buffer { let target = builder.commands.spawn(UnusedTarget).id(); builder .commands - .add(OnNewBufferValue::new(self.id(), target)); + .queue(OnNewBufferValue::new(self.id(), target)); Chain::new(target, builder) } @@ -359,7 +359,10 @@ where } impl<'w, 's, T: 'static + Send + Sync> BufferAccess<'w, 's, T> { - pub fn get<'a>(&'a self, key: &BufferKey) -> Result, QueryEntityError> { + pub fn get<'a>( + &'a self, + key: &BufferKey, + ) -> Result, QueryEntityError<'a>> { let session = key.session(); self.query .get(key.buffer()) @@ -388,7 +391,10 @@ impl<'w, 's, T> BufferAccessMut<'w, 's, T> where T: 'static + Send + Sync, { - pub fn get<'a>(&'a self, key: &BufferKey) -> Result, QueryEntityError> { + pub fn get<'a>( + &'a self, + key: &BufferKey, + ) -> Result, QueryEntityError<'a>> { let session = key.session(); self.query .get(key.buffer()) @@ -402,7 +408,7 @@ where pub fn get_mut<'a>( &'a mut self, key: &BufferKey, - ) -> Result, QueryEntityError> { + ) -> Result, QueryEntityError<'a>> { let buffer = key.buffer(); let session = key.session(); let accessor = key.tag.accessor; @@ -442,7 +448,7 @@ impl BufferWorldAccess for World { { let buffer_ref = self .get_entity(key.tag.buffer) - .ok_or(BufferError::BufferMissing)?; + .map_err(|_| BufferError::BufferMissing)?; let storage = buffer_ref .get::>() .ok_or(BufferError::BufferMissing)?; @@ -734,7 +740,7 @@ where { fn drop(&mut self) { if self.modified { - self.commands.add(NotifyBufferUpdate::new( + self.commands.queue(NotifyBufferUpdate::new( self.buffer, self.session, self.accessor, diff --git a/src/buffer/any_buffer.rs b/src/buffer/any_buffer.rs index ebfd8943..5276a101 100644 --- a/src/buffer/any_buffer.rs +++ b/src/buffer/any_buffer.rs @@ -520,7 +520,7 @@ impl<'w, 's, 'a> AnyBufferMut<'w, 's, 'a> { impl<'w, 's, 'a> Drop for AnyBufferMut<'w, 's, 'a> { fn drop(&mut self) { if self.modified { - self.commands.add(NotifyBufferUpdate::new( + self.commands.queue(NotifyBufferUpdate::new( self.buffer, self.session, self.accessor, @@ -1003,7 +1003,7 @@ impl AnyBufferAccessInterface for AnyBufferAcces ) -> Result, BufferError> { let buffer_ref = world .get_entity(key.tag.buffer) - .ok_or(BufferError::BufferMissing)?; + .map_err(|_| BufferError::BufferMissing)?; let storage = buffer_ref .get::>() .ok_or(BufferError::BufferMissing)?; diff --git a/src/buffer/bufferable.rs b/src/buffer/bufferable.rs index daf55738..aab3231b 100644 --- a/src/buffer/bufferable.rs +++ b/src/buffer/bufferable.rs @@ -186,7 +186,7 @@ pub trait IterBufferable { let buffers = self.into_buffer_vec::(builder); let join = builder.commands.spawn(()).id(); let target = builder.commands.spawn(UnusedTarget).id(); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(builder.scope()), join, Join::new(buffers, target), diff --git a/src/buffer/buffering.rs b/src/buffer/buffering.rs index 81eea5f1..8e6626ea 100644 --- a/src/buffer/buffering.rs +++ b/src/buffer/buffering.rs @@ -68,7 +68,7 @@ pub trait Joining: Buffering { let join = builder.commands.spawn(()).id(); let target = builder.commands.spawn(UnusedTarget).id(); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(scope), join, Join::new(self, target), @@ -101,7 +101,7 @@ pub trait Accessing: Buffering { let listen = builder.commands.spawn(()).id(); let target = builder.commands.spawn(UnusedTarget).id(); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(scope), listen, Listen::new(self, target), @@ -113,7 +113,7 @@ pub trait Accessing: Buffering { fn access(self, builder: &mut Builder) -> Node { let source = builder.commands.spawn(()).id(); let target = builder.commands.spawn(UnusedTarget).id(); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(builder.scope), source, OperateBufferAccess::::new(self, target), @@ -189,7 +189,7 @@ pub trait Accessing: Buffering { let begin_cancel = builder.commands.spawn(()).set_parent(builder.scope).id(); self.verify_scope(builder.scope); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( None, begin_cancel, BeginCleanupWorkflow::::new( diff --git a/src/buffer/json_buffer.rs b/src/buffer/json_buffer.rs index 68cb1c95..248aff85 100644 --- a/src/buffer/json_buffer.rs +++ b/src/buffer/json_buffer.rs @@ -458,7 +458,7 @@ impl<'w, 's, 'a> JsonBufferMut<'w, 's, 'a> { impl<'w, 's, 'a> Drop for JsonBufferMut<'w, 's, 'a> { fn drop(&mut self) { if self.modified { - self.commands.add(NotifyBufferUpdate::new( + self.commands.queue(NotifyBufferUpdate::new( self.buffer, self.session, self.accessor, @@ -922,7 +922,7 @@ impl JsonBufferAccessIn ) -> Result, BufferError> { let buffer_ref = world .get_entity(key.tag.buffer) - .ok_or(BufferError::BufferMissing)?; + .map_err(|_| BufferError::BufferMissing)?; let storage = buffer_ref .get::>() .ok_or(BufferError::BufferMissing)?; diff --git a/src/builder.rs b/src/builder.rs index a4009aab..4615d2c6 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -145,7 +145,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { /// Connect the output of one into the input slot of another node. pub fn connect(&mut self, output: Output, input: InputSlot) { assert_eq!(output.scope(), input.scope()); - self.commands.add(Connect { + self.commands.queue(Connect { original_target: output.id(), new_target: input.id(), }); @@ -159,7 +159,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { settings: BufferSettings, ) -> Buffer { let source = self.commands.spawn(()).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateBuffer::::new(settings), @@ -222,7 +222,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { T: Clone + 'static + Send + Sync, { let source = self.commands.spawn(()).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, ForkClone::::new(ForkTargetStorage::new()), @@ -258,7 +258,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let target_ok = self.commands.spawn(UnusedTarget).id(); let target_err = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, make_result_branching::(ForkTargetStorage::from_iter([target_ok, target_err])), @@ -287,7 +287,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let target_some = self.commands.spawn(UnusedTarget).id(); let target_none = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, make_option_branching::(ForkTargetStorage::from_iter([target_some, target_none])), @@ -390,7 +390,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let source = self.commands.spawn(()).id(); let target = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, Collect::::new(target, min, max), @@ -427,7 +427,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { T: 'static + Send + Sync + Splittable, { let source = self.commands.spawn(()).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateSplit::::default(), @@ -450,7 +450,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { T: 'static + Send + Sync + ToString, { let source = self.commands.spawn(()).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateCancel::::new(), @@ -466,7 +466,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { /// input value that triggered it, use [`Self::create_cancel`]. pub fn create_quiet_cancel(&mut self) -> InputSlot<()> { let source = self.commands.spawn(()).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateQuietCancel, @@ -582,7 +582,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let source = self.commands.spawn(()).id(); let target = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, Trim::::new(branches, target), @@ -613,7 +613,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let source = self.commands.spawn(()).id(); let target = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateDynamicGate::::new(buffers, target), @@ -642,7 +642,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let source = self.commands.spawn(()).id(); let target = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateStaticGate::::new(buffers, target, action), @@ -752,7 +752,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let mut map = StreamTargetMap::default(); let (bundle, streams) = Streams::spawn_node_streams(source, &mut map, self); self.commands.entity(source).insert((bundle, map)); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, Injection::::new(target), diff --git a/src/builder/connect.rs b/src/builder/connect.rs index 80c8819c..9fff88b7 100644 --- a/src/builder/connect.rs +++ b/src/builder/connect.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ prelude::{Entity, World}, - system::Command, + world::Command, }; use bevy_hierarchy::prelude::DespawnRecursiveExt; diff --git a/src/callback.rs b/src/callback.rs index 7d52b0b6..2c15e95d 100644 --- a/src/callback.rs +++ b/src/callback.rs @@ -267,7 +267,7 @@ pub trait CallbackTrait { pub struct BlockingCallbackMarker(std::marker::PhantomData); struct BlockingCallbackSystem { - system: BoxedSystem, Response>, + system: BoxedSystem>, Response>, initialized: bool, } @@ -319,7 +319,7 @@ where pub struct AsyncCallbackMarker(std::marker::PhantomData); struct AsyncCallbackSystem { - system: BoxedSystem, Task>, + system: BoxedSystem>, Task>, initialized: bool, } @@ -389,7 +389,7 @@ pub trait AsCallback { impl AsCallback> for Sys where - Sys: IntoSystem, Response, M>, + Sys: IntoSystem>, Response, M>, Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamPack, @@ -409,7 +409,7 @@ where impl AsCallback> for Sys where - Sys: IntoSystem, Task, M>, + Sys: IntoSystem>, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, @@ -510,7 +510,7 @@ pub trait IntoBlockingCallback { impl IntoBlockingCallback> for Sys where - Sys: IntoSystem, + Sys: IntoSystem, Response, M>, Request: 'static + Send + Sync, Response: 'static + Send + Sync, { @@ -551,7 +551,7 @@ pub trait IntoAsyncCallback { impl IntoAsyncCallback> for Sys where - Sys: IntoSystem, + Sys: IntoSystem, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, @@ -602,7 +602,7 @@ where target: Entity, commands: &mut Commands, ) { - commands.add(AddOperation::new( + commands.queue(AddOperation::new( scope, source, OperateCallback::new(self, target), diff --git a/src/cancel.rs b/src/cancel.rs index b255b545..35023ebf 100644 --- a/src/cancel.rs +++ b/src/cancel.rs @@ -379,7 +379,7 @@ pub fn try_emit_broken( world: &mut World, roster: &mut OperationRoster, ) { - if let Some(mut source_mut) = world.get_entity_mut(source) { + if let Ok(mut source_mut) = world.get_entity_mut(source) { source_mut.emit_broken(backtrace, roster); } else { world diff --git a/src/chain.rs b/src/chain.rs index ccc8f7a4..9d8ce81a 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -310,7 +310,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, OperateBufferAccess::::new(buffers, target), @@ -361,7 +361,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { where T: ToString, { - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), self.target, OperateCancel::::new(), @@ -460,7 +460,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, Spread::::new(target), @@ -496,7 +496,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, Collect::::new(target, min, max), @@ -533,7 +533,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, Trim::::new(branches, target), @@ -580,7 +580,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, OperateStaticGate::::new(buffers, target, action), @@ -627,7 +627,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { T: Splittable, { let source = self.target; - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, OperateSplit::::default(), @@ -698,7 +698,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, Noop::::new(target), @@ -773,7 +773,7 @@ where let target_ok = self.builder.commands.spawn(UnusedTarget).id(); let target_err = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, make_result_branching::(ForkTargetStorage::from_iter([target_ok, target_err])), @@ -837,7 +837,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateCancelFilter::on_err::(target), @@ -854,7 +854,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateCancelFilter::on_quiet_err::(target), @@ -876,7 +876,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateDisposalFilter::on_err::(target), @@ -893,7 +893,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateDisposalFilter::on_quiet_err::(target), @@ -909,7 +909,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateDisposalFilter::on_ok::(target), @@ -951,7 +951,7 @@ where let target_some = self.builder.commands.spawn(UnusedTarget).id(); let target_none = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, make_option_branching::(ForkTargetStorage::from_iter([target_some, target_none])), @@ -983,7 +983,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateCancelFilter::on_none::(target), @@ -1002,7 +1002,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateDisposalFilter::on_none::(target), @@ -1019,7 +1019,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateDisposalFilter::on_some::(target), @@ -1121,7 +1121,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, OperateDynamicGate::::new(buffers, target), @@ -1176,7 +1176,7 @@ impl<'w, 's, 'a, 'b> Chain<'w, 's, 'a, 'b, ()> { /// If you want to include information about the value that triggered the /// cancellation, use [`Self::then_cancel`]. pub fn then_quiet_cancel(self) { - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), self.target, OperateQuietCancel, diff --git a/src/chain/fork_clone_builder.rs b/src/chain/fork_clone_builder.rs index 2dc279c1..76fa824f 100644 --- a/src/chain/fork_clone_builder.rs +++ b/src/chain/fork_clone_builder.rs @@ -50,7 +50,7 @@ macro_rules! impl_forkclonebuilder_for_tuple { )* ]; - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(source.scope()), source.id(), ForkClone::::new( diff --git a/src/chain/premade.rs b/src/chain/premade.rs index e1a0d6f3..c1502c76 100644 --- a/src/chain/premade.rs +++ b/src/chain/premade.rs @@ -15,12 +15,26 @@ * */ -use bevy_ecs::{prelude::In, query::QueryEntityError}; +use bevy_ecs::{ + prelude::{Entity, In}, + query::QueryEntityError, +}; use smallvec::SmallVec; +use thiserror::Error; use crate::{BufferAccessMut, BufferKey}; +#[derive(Debug, Error)] +pub enum BufferAccessError { + #[error("The query does not match the entity {0}")] + QueryDoesNotMatch(Entity), + #[error("The entity {0} does not exist")] + NoSuchEntity(Entity), + #[error("The entity {0} was requested mutably more than once")] + AliasedMutability(Entity), +} + pub(super) fn consume_buffer( In(key): In>, mut access: BufferAccessMut, @@ -38,7 +52,14 @@ where pub fn push_into_buffer( In((input, key)): In<(T, BufferKey)>, mut access: BufferAccessMut, -) -> Result<(), QueryEntityError> { - access.get_mut(&key)?.push(input); +) -> Result<(), BufferAccessError> { + access + .get_mut(&key) + .map_err(|err| match err { + QueryEntityError::QueryDoesNotMatch(e, _) => BufferAccessError::QueryDoesNotMatch(e), + QueryEntityError::NoSuchEntity(e) => BufferAccessError::NoSuchEntity(e), + QueryEntityError::AliasedMutability(e) => BufferAccessError::AliasedMutability(e), + })? + .push(input); Ok(()) } diff --git a/src/chain/split.rs b/src/chain/split.rs index beb1c83e..48dbbe08 100644 --- a/src/chain/split.rs +++ b/src/chain/split.rs @@ -168,7 +168,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Splittable> SplitBuilder<'w, 's, 'a, 'b, T> { } let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(ConnectToSplit:: { + self.builder.commands.queue(ConnectToSplit:: { source: self.outputs.source, target, key, diff --git a/src/chain/unzip.rs b/src/chain/unzip.rs index 96a7bbc7..d16f7adb 100644 --- a/src/chain/unzip.rs +++ b/src/chain/unzip.rs @@ -61,7 +61,7 @@ macro_rules! impl_unzippable_for_tuple { )* ); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(output.scope()), output.id(), ForkUnzip::::new(ForkTargetStorage(targets)), @@ -79,7 +79,7 @@ macro_rules! impl_unzippable_for_tuple { let ($($D,)*) = world.get::(source).or_broken()?.0.iter().copied().next_tuple().or_broken()?; let ($($T,)*) = inputs; $( - if let Some(mut t_mut) = world.get_entity_mut($D) { + if let Ok(mut t_mut) = world.get_entity_mut($D) { t_mut.give_input(session, $T, roster)?; } )* diff --git a/src/channel.rs b/src/channel.rs index 758605d4..1abacf12 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -17,7 +17,8 @@ use bevy_ecs::{ prelude::{Entity, Resource, World}, - system::{CommandQueue, Commands}, + system::Commands, + world::CommandQueue, }; use tokio::sync::mpsc::{ @@ -223,7 +224,7 @@ mod tests { let count = context .app - .world + .world() .get::(hello.provider()) .unwrap() .0; @@ -231,7 +232,7 @@ mod tests { let count = context .app - .world + .world() .get::(repeat.provider()) .unwrap() .0; diff --git a/src/diagram.rs b/src/diagram.rs index 493fe735..7806a1ac 100644 --- a/src/diagram.rs +++ b/src/diagram.rs @@ -969,7 +969,7 @@ impl Diagram { /// "#; /// /// let diagram = Diagram::from_json_str(json_str)?; - /// let workflow = app.world.command(|cmds| diagram.spawn_io_workflow::(cmds, ®istry))?; + /// let workflow = app.world_mut().command(|cmds| diagram.spawn_io_workflow::(cmds, ®istry))?; /// # Ok::<_, Box>(()) /// ``` // TODO(koonpeng): Support streams other than `()` #43. @@ -1045,7 +1045,7 @@ impl Diagram { /// "#; /// /// let diagram = Diagram::from_json_str(json_str)?; - /// let workflow = app.world.command(|cmds| diagram.spawn_io_workflow::(cmds, ®istry))?; + /// let workflow = app.world_mut().command(|cmds| diagram.spawn_io_workflow::(cmds, ®istry))?; /// # Ok::<_, Box>(()) /// ``` pub fn spawn_io_workflow( diff --git a/src/diagram/registration.rs b/src/diagram/registration.rs index b52e1aa6..fe66cff2 100644 --- a/src/diagram/registration.rs +++ b/src/diagram/registration.rs @@ -153,7 +153,7 @@ impl DynOutput { }); } - builder.commands().add(Connect { + builder.commands().queue(Connect { original_target: self.id(), new_target: input.id(), }); diff --git a/src/diagram/section_schema.rs b/src/diagram/section_schema.rs index 798bac5e..2a2aa5c6 100644 --- a/src/diagram/section_schema.rs +++ b/src/diagram/section_schema.rs @@ -626,7 +626,7 @@ mod tests { .unwrap(); let mut context = TestingContext::minimal_plugins(); - let mut promise = context.app.world.command(|cmds| { + let mut promise = context.app.world_mut().command(|cmds| { let workflow = diagram .spawn_io_workflow::(cmds, ®istry) .unwrap(); @@ -662,7 +662,7 @@ mod tests { let mut context = TestingContext::minimal_plugins(); let err = context .app - .world + .world_mut() .command(|cmds| diagram.spawn_io_workflow::(cmds, ®istry)) .unwrap_err(); let section_err = match err.code { @@ -707,7 +707,7 @@ mod tests { let mut context = TestingContext::minimal_plugins(); let err = context .app - .world + .world_mut() .command(|cmds| { diagram.spawn_io_workflow::(cmds, &fixture.registry) }) diff --git a/src/diagram/testing.rs b/src/diagram/testing.rs index 84f4b904..996777a2 100644 --- a/src/diagram/testing.rs +++ b/src/diagram/testing.rs @@ -40,7 +40,7 @@ impl DiagramTestFixture { { self.context .app - .world + .world_mut() .command(|cmds| diagram.spawn_workflow(cmds, &self.registry)) } diff --git a/src/disposal.rs b/src/disposal.rs index c27e7ebd..fe9cdb86 100644 --- a/src/disposal.rs +++ b/src/disposal.rs @@ -545,7 +545,7 @@ pub fn emit_disposal( world: &mut World, roster: &mut OperationRoster, ) { - if let Some(mut source_mut) = world.get_entity_mut(source) { + if let Ok(mut source_mut) = world.get_entity_mut(source) { source_mut.emit_disposal(session, disposal, roster); } else { world diff --git a/src/flush.rs b/src/flush.rs index 622c3113..67da1fbb 100644 --- a/src/flush.rs +++ b/src/flush.rs @@ -19,9 +19,10 @@ use bevy_derive::{Deref, DerefMut}; use bevy_ecs::{ prelude::{Added, Entity, Query, QueryState, Resource, With, World}, schedule::{IntoSystemConfigs, SystemConfigs}, - system::{Command, SystemState}, + system::SystemState, + world::Command, }; -use bevy_hierarchy::{BuildWorldChildren, Children, DespawnRecursiveExt}; +use bevy_hierarchy::{BuildChildren, Children, DespawnRecursiveExt}; use smallvec::SmallVec; @@ -88,7 +89,7 @@ fn flush_impulses_impl( let mut loop_count = 0; while !roster.is_empty() { for e in roster.deferred_despawn.drain(..) { - if let Some(e_mut) = world.get_entity_mut(e) { + if let Ok(e_mut) = world.get_entity_mut(e) { e_mut.despawn_recursive(); } } @@ -329,12 +330,12 @@ fn drop_target(target: Entity, world: &mut World, roster: &mut OperationRoster, } if let Some(detached_impulse) = detached_impulse { - if let Some(mut detached_impulse_mut) = world.get_entity_mut(detached_impulse) { + if let Ok(mut detached_impulse_mut) = world.get_entity_mut(detached_impulse) { detached_impulse_mut.remove_parent(); } } - if let Some(unused_target_mut) = world.get_entity_mut(target) { + if let Ok(unused_target_mut) = world.get_entity_mut(target) { unused_target_mut.despawn_recursive(); } diff --git a/src/impulse.rs b/src/impulse.rs index 77765ecd..1d34acde 100644 --- a/src/impulse.rs +++ b/src/impulse.rs @@ -79,7 +79,7 @@ where /// | [`Self::detach`]
[`Self::send_event`] | This will never be dropped | /// | Using none of the above | The impulse will immediately be dropped during a flush, so it will never be run at all.
This will also push an error into [`UnhandledErrors`](crate::UnhandledErrors). | pub fn detach(self) -> Impulse<'w, 's, 'a, Response, Streams> { - self.commands.add(Detach { + self.commands.queue(Detach { target: self.target, }); self @@ -90,7 +90,7 @@ where #[must_use] pub fn take(self) -> Recipient { let (response_sender, response_promise) = Promise::::new(); - self.commands.add(AddImpulse::new( + self.commands.queue(AddImpulse::new( self.target, TakenResponse::::new(response_sender), )); @@ -108,7 +108,7 @@ where /// Take only the response data that comes out of the request. pub fn take_response(self) -> Promise { let (response_sender, response_promise) = Promise::::new(); - self.commands.add(AddImpulse::new( + self.commands.queue(AddImpulse::new( self.target, TakenResponse::::new(response_sender), )); @@ -212,7 +212,7 @@ where /// [`Self::detach`] before calling this. pub fn store(self, target: Entity) { self.commands - .add(AddImpulse::new(self.target, Store::::new(target))); + .queue(AddImpulse::new(self.target, Store::::new(target))); let mut map = StreamTargetMap::default(); let stream_targets = Streams::collect_streams(self.source, target, &mut map, self.commands); @@ -249,7 +249,7 @@ where /// If the entity despawns then the request gets cancelled unless you used /// [`Self::detach`] before calling this. pub fn push(self, target: Entity) { - self.commands.add(AddImpulse::new( + self.commands.queue(AddImpulse::new( self.target, Push::::new(target, false), )); @@ -282,7 +282,7 @@ where /// [`Self::store`] or [`Self::push`]. Alternatively you can transform it /// into a bundle using [`Self::map_block`] or [`Self::map_async`]. pub fn insert(self, target: Entity) { - self.commands.add(AddImpulse::new( + self.commands.queue(AddImpulse::new( self.target, Insert::::new(target), )); @@ -299,7 +299,7 @@ where /// Using this will also effectively [detach](Self::detach) the impulse. pub fn send_event(self) { self.commands - .add(AddImpulse::new(self.target, SendEvent::::new())); + .queue(AddImpulse::new(self.target, SendEvent::::new())); } } @@ -337,7 +337,7 @@ impl Default for Collection { #[cfg(test)] mod tests { use crate::{prelude::*, testing::*, ContinuousQueueView}; - use bevy_utils::label::DynEq; + use bevy_ecs::label::DynEq; use smallvec::SmallVec; use std::{ sync::{Arc, Mutex}, diff --git a/src/impulse/detach.rs b/src/impulse/detach.rs index 88b01080..fd1c0470 100644 --- a/src/impulse/detach.rs +++ b/src/impulse/detach.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ prelude::{Component, Entity, World}, - system::Command, + world::Command, }; use anyhow::anyhow; @@ -44,7 +44,7 @@ pub(crate) struct Detach { impl Command for Detach { fn apply(self, world: &mut World) { let backtrace; - if let Some(mut session_mut) = world.get_entity_mut(self.target) { + if let Ok(mut session_mut) = world.get_entity_mut(self.target) { if let Some(mut detached) = session_mut.get_mut::() { detached.0 = true; session_mut.remove::(); diff --git a/src/impulse/insert.rs b/src/impulse/insert.rs index 6adb7474..d45293c1 100644 --- a/src/impulse/insert.rs +++ b/src/impulse/insert.rs @@ -51,7 +51,7 @@ impl Impulsive for Insert { let mut source_mut = world.get_entity_mut(source).or_broken()?; let Input { data, .. } = source_mut.take_input::()?; let target = source_mut.get::>().or_broken()?.target; - if let Some(mut target_mut) = world.get_entity_mut(target) { + if let Ok(mut target_mut) = world.get_entity_mut(target) { target_mut.insert(data); } diff --git a/src/impulse/internal.rs b/src/impulse/internal.rs index 6fd26fd1..a88db289 100644 --- a/src/impulse/internal.rs +++ b/src/impulse/internal.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ prelude::{Component, Entity, Resource, World}, - system::Command, + world::Command, }; use bevy_hierarchy::DespawnRecursiveExt; @@ -102,7 +102,7 @@ fn perform_impulse( // Do nothing } Err(OperationError::Broken(backtrace)) => { - if let Some(mut source_mut) = world.get_entity_mut(source) { + if let Ok(mut source_mut) = world.get_entity_mut(source) { source_mut.emit_broken(backtrace, roster); } else { world @@ -165,7 +165,7 @@ pub(crate) fn cancel_impulse( } } - if let Some(terminal_mut) = world.get_entity_mut(terminal) { + if let Ok(terminal_mut) = world.get_entity_mut(terminal) { terminal_mut.despawn_recursive(); } @@ -230,7 +230,7 @@ pub(crate) fn add_lifecycle_dependency(source: Entity, target: Entity, world: &m if let Some(mut lifecycle) = world.get_mut::(target) { lifecycle.sources.push(source); - } else if let Some(mut target_mut) = world.get_entity_mut(target) { + } else if let Ok(mut target_mut) = world.get_entity_mut(target) { target_mut.insert(ImpulseLifecycle::new(source, sender)); } else { // The target is already despawned diff --git a/src/impulse/store.rs b/src/impulse/store.rs index d1c0e634..0a8114fc 100644 --- a/src/impulse/store.rs +++ b/src/impulse/store.rs @@ -51,7 +51,7 @@ impl Impulsive for Store { let mut source_mut = world.get_entity_mut(source).or_broken()?; let Input { session, data } = source_mut.take_input::()?; let target = source_mut.get::>().or_broken()?.target; - if let Some(mut target_mut) = world.get_entity_mut(target) { + if let Ok(mut target_mut) = world.get_entity_mut(target) { target_mut.insert(Storage { data, session }); } world.entity_mut(source).despawn_recursive(); diff --git a/src/input.rs b/src/input.rs index b5b1ba9a..78224936 100644 --- a/src/input.rs +++ b/src/input.rs @@ -17,8 +17,7 @@ use bevy_ecs::{ prelude::{Bundle, Component, Entity}, - system::Command, - world::{EntityRef, EntityWorldMut, World}, + world::{Command, EntityRef, EntityWorldMut, World}, }; use smallvec::SmallVec; diff --git a/src/map.rs b/src/map.rs index ff9adf30..645786ef 100644 --- a/src/map.rs +++ b/src/map.rs @@ -137,7 +137,7 @@ where target: Entity, commands: &mut Commands, ) { - commands.add(AddOperation::new( + commands.queue(AddOperation::new( scope, source, OperateBlockingMap::new(target, self.def), @@ -280,7 +280,7 @@ where target: Entity, commands: &mut Commands, ) { - commands.add(AddOperation::new( + commands.queue(AddOperation::new( scope, source, OperateAsyncMap::new(target, self.def), diff --git a/src/map_once.rs b/src/map_once.rs index bd34d213..ea06f4ab 100644 --- a/src/map_once.rs +++ b/src/map_once.rs @@ -74,7 +74,7 @@ where type Streams = (); fn connect(self, _: Option, source: Entity, target: Entity, commands: &mut Commands) { - commands.add(AddImpulse::new( + commands.queue(AddImpulse::new( source, ImpulseBlockingMap::new(target, self.def), )); @@ -183,7 +183,7 @@ where type Streams = Streams; fn connect(self, _: Option, source: Entity, target: Entity, commands: &mut Commands) { - commands.add(AddImpulse::new( + commands.queue(AddImpulse::new( source, ImpulseAsyncMap::new(target, self.def), )); diff --git a/src/node.rs b/src/node.rs index 764aec23..a55f7aec 100644 --- a/src/node.rs +++ b/src/node.rs @@ -126,7 +126,7 @@ impl Output { Response: Clone, { assert_eq!(self.scope, builder.scope); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(self.scope), self.target, ForkClone::::new(ForkTargetStorage::new()), @@ -169,7 +169,7 @@ impl ForkCloneOutput { .commands .spawn((SingleInputStorage::new(self.id()), UnusedTarget)) .id(); - builder.commands.add(AddBranchToForkClone { + builder.commands.queue(AddBranchToForkClone { source: self.source, target, }); diff --git a/src/operation.rs b/src/operation.rs index 58953238..ed04ddb3 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -23,9 +23,9 @@ use crate::{ use bevy_derive::Deref; use bevy_ecs::{ prelude::{Component, Entity, World}, - system::Command, + world::Command, }; -use bevy_hierarchy::prelude::BuildWorldChildren; +use bevy_hierarchy::prelude::BuildChildren; use backtrace::Backtrace; @@ -658,7 +658,7 @@ pub fn execute_operation(request: OperationRequest) { // which end up getting dropped during a cleanup. In that case, the // source entity will be totally despawned, so check for that before // concluding that this is broken. - if request.world.get_entity(request.source).is_some() { + if request.world.get_entity(request.source).is_ok() { // The node does not have an operation and is not an unused target, // so this is broken somehow. request diff --git a/src/operation/fork_clone.rs b/src/operation/fork_clone.rs index 8b113eb0..58f0d67c 100644 --- a/src/operation/fork_clone.rs +++ b/src/operation/fork_clone.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ prelude::{Entity, World}, - system::Command, + world::Command, }; use anyhow::anyhow; diff --git a/src/operation/injection.rs b/src/operation/injection.rs index 20a3f3a0..1b720c36 100644 --- a/src/operation/injection.rs +++ b/src/operation/injection.rs @@ -26,7 +26,7 @@ use crate::{ use bevy_ecs::{ prelude::{Component, Entity}, - system::Command, + world::Command, }; use bevy_hierarchy::prelude::DespawnRecursiveExt; diff --git a/src/operation/operate_buffer.rs b/src/operation/operate_buffer.rs index 95d1d467..8f9811a4 100644 --- a/src/operation/operate_buffer.rs +++ b/src/operation/operate_buffer.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ prelude::{Bundle, Component, Entity, World}, - system::Command, + world::Command, }; use std::{collections::HashMap, sync::Arc}; @@ -229,7 +229,7 @@ impl Command for OnNewBufferValue { buffer_targets.0.push(self.buffer); - let Some(mut target_mut) = world.get_entity_mut(self.target) else { + let Ok(mut target_mut) = world.get_entity_mut(self.target) else { self.on_failure(world); return; }; diff --git a/src/operation/operate_service.rs b/src/operation/operate_service.rs index 9437fa0c..d4b8f19e 100644 --- a/src/operation/operate_service.rs +++ b/src/operation/operate_service.rs @@ -198,7 +198,7 @@ fn dispose_for_unavailable_service( roster: &mut OperationRoster, ) { let disposal = Disposal::service_unavailable(service, source); - if let Some(mut source_mut) = world.get_entity_mut(source) { + if let Ok(mut source_mut) = world.get_entity_mut(source) { while let Ok(Input { session, .. }) = source_mut.take_input::() { source_mut.emit_disposal(session, disposal.clone(), roster); } diff --git a/src/operation/operate_split.rs b/src/operation/operate_split.rs index da2dead8..9b043adf 100644 --- a/src/operation/operate_split.rs +++ b/src/operation/operate_split.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ prelude::{Component, Entity, World}, - system::Command, + world::Command, }; use smallvec::SmallVec; use std::{collections::HashMap, sync::Arc}; diff --git a/src/operation/operate_task.rs b/src/operation/operate_task.rs index 222a2b8a..20799b8e 100644 --- a/src/operation/operate_task.rs +++ b/src/operation/operate_task.rs @@ -17,9 +17,9 @@ use bevy_ecs::{ prelude::{Component, Entity, Resource, World}, - system::Command, + world::Command, }; -use bevy_hierarchy::{BuildWorldChildren, DespawnRecursiveExt}; +use bevy_hierarchy::{BuildChildren, DespawnRecursiveExt}; use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll}; @@ -382,7 +382,7 @@ fn cleanup_task( roster.unblock(unblock); } - if let Some(mut node_mut) = world.get_entity_mut(node) { + if let Ok(mut node_mut) = world.get_entity_mut(node) { if let Some(mut active_tasks) = node_mut.get_mut::() { let mut cleanup_ready = true; active_tasks.list.retain( @@ -420,7 +420,7 @@ fn cleanup_task( }; }; - if let Some(source_mut) = world.get_entity_mut(source) { + if let Ok(source_mut) = world.get_entity_mut(source) { source_mut.despawn_recursive(); } diff --git a/src/operation/scope.rs b/src/operation/scope.rs index f757653b..afa4bffb 100644 --- a/src/operation/scope.rs +++ b/src/operation/scope.rs @@ -370,7 +370,7 @@ where if result.is_err() { // We won't be executing this scope after all, so despawn the scoped // session that we created. - if let Some(scoped_session_mut) = world.get_entity_mut(scoped_session) { + if let Ok(scoped_session_mut) = world.get_entity_mut(scoped_session) { scoped_session_mut.despawn_recursive(); } return result; @@ -537,9 +537,9 @@ where // Note: We need to make sure the scope object gets set up before any of // its endpoints, otherwise the ScopeContents component will be missing // during setup. - commands.add(AddOperation::new(parent_scope, scope_id, scope)); + commands.queue(AddOperation::new(parent_scope, scope_id, scope)); - commands.add(AddOperation::new( + commands.queue(AddOperation::new( // We do not consider the terminal node to be "inside" the scope, // otherwise it will get cleaned up prematurely None, @@ -547,7 +547,7 @@ where Terminate::::new(scope_id), )); - commands.add(AddOperation::new( + commands.queue(AddOperation::new( // We do not consider the finish cancel node to be "inside" the // scope, otherwise it will get cleaned up prematurely None, @@ -1543,8 +1543,8 @@ impl FinishCleanup { clear_scope_buffers(scope, scoped_session, world)?; - if world.get_entity(scoped_session).is_some() { - if let Some(scoped_session_mut) = world.get_entity_mut(scoped_session) { + if world.get_entity(scoped_session).is_ok() { + if let Ok(scoped_session_mut) = world.get_entity_mut(scoped_session) { scoped_session_mut.despawn_recursive(); } } diff --git a/src/request.rs b/src/request.rs index 1fcab2e7..d309d24e 100644 --- a/src/request.rs +++ b/src/request.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ prelude::{Commands, World}, - system::CommandQueue, + world::CommandQueue, }; use bevy_hierarchy::BuildChildren; @@ -110,7 +110,7 @@ impl<'w, 's> RequestExt<'w, 's> for Commands<'w, 's> { .id(); provider.connect(None, source, target, self); - self.add(InputCommand { + self.queue(InputCommand { session: source, target: source, data: request, diff --git a/src/service.rs b/src/service.rs index f4b0a72c..e5de88e4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -23,11 +23,12 @@ use crate::{ use bevy_app::prelude::App; use bevy_derive::{Deref, DerefMut}; use bevy_ecs::{ + define_label, + intern::Interned, prelude::{Commands, Component, Entity, Event, World}, schedule::ScheduleLabel, }; pub use bevy_impulse_derive::DeliveryLabel; -use bevy_utils::{define_label, intern::Interned}; use std::{any::TypeId, collections::HashSet, sync::OnceLock}; use thiserror::Error as ThisError; @@ -218,7 +219,7 @@ define_label!( pub mod utils { /// Used by the procedural macro for DeliveryLabel - pub use bevy_utils::label::DynEq; + pub use bevy_ecs::label::DynEq; } /// When using a service, you can bundle in delivery instructions that affect @@ -586,7 +587,7 @@ where target: Entity, commands: &mut Commands, ) { - commands.add(AddOperation::new( + commands.queue(AddOperation::new( scope, source, OperateService::new(self, target), @@ -637,7 +638,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -648,7 +649,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -659,7 +660,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -672,7 +673,7 @@ mod tests { .add_systems(Update, sys_use_my_service_provider); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -683,7 +684,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -694,7 +695,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -707,7 +708,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } fn sys_async_service( @@ -807,9 +808,9 @@ mod tests { let mut recipient = context.command(|commands| commands.request((), event_streamer).take()); - context.app.world.send_event(CustomEvent(0)); - context.app.world.send_event(CustomEvent(1)); - context.app.world.send_event(CustomEvent(2)); + context.app.world_mut().send_event(CustomEvent(0)); + context.app.world_mut().send_event(CustomEvent(1)); + context.app.world_mut().send_event(CustomEvent(2)); context.run_with_conditions(&mut recipient.response, 1); diff --git a/src/service/async_srv.rs b/src/service/async_srv.rs index 3cdce5f2..36d40db6 100644 --- a/src/service/async_srv.rs +++ b/src/service/async_srv.rs @@ -39,17 +39,17 @@ pub trait IsAsyncService {} #[derive(Component)] struct AsyncServiceStorage( - Option, Task>>, + Option>, Task>>, ); #[derive(Component)] struct UninitAsyncServiceStorage( - BoxedSystem, Task>, + BoxedSystem>, Task>, ); impl IntoService<(Request, Streams, Task, M)> for Sys where - Sys: IntoSystem, Task, M>, + Sys: IntoSystem>, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, @@ -77,7 +77,7 @@ where impl IsAsyncService<(Request, Streams, Task, M)> for Sys where - Sys: IntoSystem, Task, M>, + Sys: IntoSystem>, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, @@ -151,7 +151,7 @@ where for cancelled in cancelled { let disposal = Disposal::supplanted(cancelled.source, source, session); emit_disposal(cancelled.source, cancelled.session, disposal, world, roster); - if let Some(task_mut) = world.get_entity_mut(cancelled.task_id) { + if let Ok(task_mut) = world.get_entity_mut(cancelled.task_id) { task_mut.despawn_recursive(); } } @@ -244,7 +244,7 @@ where roster, }, } = cmd; - let mut service = if let Some(mut provider_mut) = world.get_entity_mut(provider) { + let mut service = if let Ok(mut provider_mut) = world.get_entity_mut(provider) { if let Some(mut storage) = provider_mut.get_mut::>() { @@ -406,7 +406,7 @@ pub trait IntoAsyncService { impl IntoAsyncService> for Sys where - Sys: IntoSystem, + Sys: IntoSystem, Response, M>, Request: 'static + Send, Response: 'static + Send, { @@ -418,7 +418,7 @@ where impl IntoService<(Request, Task, M)> for AsAsyncService where - Sys: IntoSystem, + Sys: IntoSystem, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, @@ -441,7 +441,7 @@ where impl IsAsyncService<(Request, Task, M)> for AsAsyncService where - Sys: IntoSystem, + Sys: IntoSystem, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, diff --git a/src/service/blocking.rs b/src/service/blocking.rs index 1f10bcf0..ab342f88 100644 --- a/src/service/blocking.rs +++ b/src/service/blocking.rs @@ -32,18 +32,18 @@ pub struct Blocking(std::marker::PhantomData); #[derive(Component)] struct BlockingServiceStorage( - Option, Response>>, + Option>, Response>>, ); #[derive(Component)] struct UninitBlockingServiceStorage( - BoxedSystem, Response>, + BoxedSystem>, Response>, ); impl IntoService> for Sys where - Sys: IntoSystem, Response, M>, + Sys: IntoSystem>, Response, M>, Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamPack, @@ -97,7 +97,7 @@ where .or_broken()? .take_input::()?; - let mut service = if let Some(mut provider_mut) = world.get_entity_mut(provider) { + let mut service = if let Ok(mut provider_mut) = world.get_entity_mut(provider) { if let Some(mut storage) = provider_mut.get_mut::>() { @@ -146,7 +146,7 @@ where let mut unused_streams = UnusedStreams::new(source); Streams::process_buffer(streams, source, session, &mut unused_streams, world, roster)?; - if let Some(mut provider_mut) = world.get_entity_mut(provider) { + if let Ok(mut provider_mut) = world.get_entity_mut(provider) { if let Some(mut storage) = provider_mut.get_mut::>() { @@ -191,7 +191,7 @@ pub trait IntoBlockingService { impl IntoBlockingService> for Sys where - Sys: IntoSystem, + Sys: IntoSystem, Response, M>, Request: 'static, Response: 'static, { @@ -204,7 +204,7 @@ where impl IntoService> for AsBlockingService where - Sys: IntoSystem, + Sys: IntoSystem, Response, M>, Request: 'static + Send + Sync, Response: 'static + Send + Sync, { diff --git a/src/service/continuous.rs b/src/service/continuous.rs index af394f2e..d2dac9f7 100644 --- a/src/service/continuous.rs +++ b/src/service/continuous.rs @@ -18,10 +18,10 @@ use bevy_ecs::{ prelude::{Commands, Component, Entity, Event, EventReader, In, Local, Query, World}, schedule::IntoSystemConfigs, - system::{Command, IntoSystem, SystemParam}, - world::EntityWorldMut, + system::{IntoSystem, SystemParam}, + world::{Command, EntityWorldMut}, }; -use bevy_hierarchy::prelude::{BuildWorldChildren, DespawnRecursiveExt}; +use bevy_hierarchy::prelude::{BuildChildren, DespawnRecursiveExt}; use smallvec::SmallVec; @@ -476,7 +476,7 @@ where if !responses.is_empty() { self.commands - .add(DeliverResponses:: { + .queue(DeliverResponses:: { responses, _ignore: Default::default(), }); @@ -648,7 +648,7 @@ where } } - if let Some(task_mut) = world.get_entity_mut(task_id) { + if let Ok(task_mut) = world.get_entity_mut(task_id) { task_mut.despawn_recursive(); } } @@ -778,7 +778,7 @@ where for cancelled in cancelled { let disposal = Disposal::supplanted(cancelled.source, source, session); emit_disposal(cancelled.source, cancelled.session, disposal, world, roster); - if let Some(task_mut) = world.get_entity_mut(cancelled.task_id) { + if let Ok(task_mut) = world.get_entity_mut(cancelled.task_id) { task_mut.despawn_recursive(); } } @@ -815,7 +815,7 @@ where let disposal = Disposal::supplanted(stop.source, source, session); emit_disposal(stop.source, stop.session, disposal, world, roster); - if let Some(task_mut) = world.get_entity_mut(stop.task_id) { + if let Ok(task_mut) = world.get_entity_mut(stop.task_id) { task_mut.despawn_recursive(); } } @@ -942,7 +942,7 @@ fn serve_next_continuous_request( impl IntoContinuousService<(Request, Response, Streams, M)> for Sys where - Sys: IntoSystem, (), M>, + Sys: IntoSystem>, (), M>, Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamPack, diff --git a/src/service/discovery.rs b/src/service/discovery.rs index bb2262c2..4d804868 100644 --- a/src/service/discovery.rs +++ b/src/service/discovery.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ prelude::{Entity, Query, With}, - query::{QueryEntityError, QueryIter, ReadOnlyWorldQuery}, + query::{QueryEntityError, QueryFilter, QueryIter}, system::SystemParam, }; @@ -36,7 +36,7 @@ where Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamFilter + 'static, - Filter: ReadOnlyWorldQuery + 'static, + Filter: QueryFilter + 'static, { query: Query< 'w, @@ -56,7 +56,7 @@ where Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamFilter, - Filter: ReadOnlyWorldQuery + 'static, + Filter: QueryFilter + 'static, { pub fn iter(&self) -> IterServiceDiscovery<'_, 's, Request, Response, Streams, Filter> { IterServiceDiscovery { @@ -78,7 +78,7 @@ where Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamFilter, - Filter: ReadOnlyWorldQuery + 'static, + Filter: QueryFilter + 'static, { inner: QueryIter< 'w, @@ -98,7 +98,7 @@ where Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamFilter, - Filter: ReadOnlyWorldQuery + 'static, + Filter: QueryFilter + 'static, { type Item = Service; diff --git a/src/service/service_builder.rs b/src/service/service_builder.rs index 8d3a4297..b49fa1ee 100644 --- a/src/service/service_builder.rs +++ b/src/service/service_builder.rs @@ -130,7 +130,7 @@ impl ServiceBuilder { Srv::Response: 'static + Send + Sync, Srv::Streams: StreamPack, { - let mut entity_mut = app.world.spawn(()); + let mut entity_mut = app.world_mut().spawn(()); self.service.insert_service_mut(&mut entity_mut); let service = Service::::new(entity_mut.id()); entity_mut.insert(::StreamAvailableBundle::default()); @@ -161,7 +161,7 @@ where Srv::Response: 'static + Send + Sync, Srv::Streams: StreamPack, { - let mut entity_mut = app.world.spawn(()); + let mut entity_mut = app.world_mut().spawn(()); let provider = entity_mut.id(); let config = self.service.into_system_config(&mut entity_mut); let config = self.configure.apply(config); diff --git a/src/service/workflow.rs b/src/service/workflow.rs index 647c6820..3e21cc22 100644 --- a/src/service/workflow.rs +++ b/src/service/workflow.rs @@ -139,7 +139,7 @@ where ); if result.is_err() { - if let Some(scoped_session_mut) = world.get_entity_mut(scoped_session) { + if let Ok(scoped_session_mut) = world.get_entity_mut(scoped_session) { scoped_session_mut.despawn_recursive(); } } @@ -336,7 +336,7 @@ fn serve_next_workflow_request( .is_err() { // The workflow will not run, so we should despawn the scoped session - if let Some(scoped_session_mut) = world.get_entity_mut(scoped_session) { + if let Ok(scoped_session_mut) = world.get_entity_mut(scoped_session) { scoped_session_mut.despawn_recursive(); } diff --git a/src/stream.rs b/src/stream.rs index 5fe97060..2ef57fb4 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -18,8 +18,8 @@ use bevy_derive::{Deref, DerefMut}; use bevy_ecs::{ prelude::{Bundle, Commands, Component, Entity, With, World}, - query::{ReadOnlyWorldQuery, WorldQuery}, - system::Command, + query::{QueryFilter, ReadOnlyQueryData, WorldQuery}, + world::Command, }; use bevy_hierarchy::BuildChildren; pub use bevy_impulse_derive::Stream; @@ -78,7 +78,7 @@ pub trait Stream: 'static + Send + Sync + Sized { ) -> (InputSlot, Output) { let source = commands.spawn(()).id(); let target = commands.spawn(UnusedTarget).id(); - commands.add(AddOperation::new( + commands.queue(AddOperation::new( Some(in_scope), source, RedirectScopeStream::::new(target), @@ -92,7 +92,7 @@ pub trait Stream: 'static + Send + Sync + Sized { fn spawn_workflow_stream(builder: &mut Builder) -> InputSlot { let source = builder.commands.spawn(()).id(); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(builder.scope()), source, RedirectWorkflowStream::::new(), @@ -142,7 +142,7 @@ pub trait Stream: 'static + Send + Sync + Sized { let index = map.add(target); - commands.add(AddImpulse::new(target, TakenStream::new(sender))); + commands.queue(AddImpulse::new(target, TakenStream::new(sender))); (StreamTargetStorage::new(index), receiver) } @@ -154,7 +154,7 @@ pub trait Stream: 'static + Send + Sync + Sized { commands: &mut Commands, ) -> StreamTargetStorage { let redirect = commands.spawn(()).set_parent(source).id(); - commands.add(AddImpulse::new(redirect, Push::::new(target, true))); + commands.queue(AddImpulse::new(redirect, Push::::new(target, true))); let index = map.add(redirect); StreamTargetStorage::new(index) } @@ -346,7 +346,7 @@ impl StreamTargetMap { /// streams to be packed together as one generic argument. pub trait StreamPack: 'static + Send + Sync { type StreamAvailableBundle: Bundle + Default; - type StreamFilter: ReadOnlyWorldQuery; + type StreamFilter: QueryFilter; type StreamStorageBundle: Bundle + Clone; type StreamInputPack; type StreamOutputPack; @@ -354,7 +354,7 @@ pub trait StreamPack: 'static + Send + Sync { type Channel: Send; type Forward: Future + Send; type Buffer: Clone; - type TargetIndexQuery: ReadOnlyWorldQuery; + type TargetIndexQuery: ReadOnlyQueryData; fn spawn_scope_streams( in_scope: Entity, @@ -535,7 +535,7 @@ impl StreamPack for T { session: Entity, commands: &mut Commands, ) { - commands.add(SendStreams:: { + commands.queue(SendStreams:: { source, session, container: buffer.container.take(), @@ -936,7 +936,7 @@ impl Command for SendStreams { /// } /// ``` pub trait StreamFilter { - type Filter: ReadOnlyWorldQuery; + type Filter: QueryFilter; type Pack: StreamPack; } diff --git a/src/testing.rs b/src/testing.rs index f7ba4664..6cf878bc 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -20,7 +20,8 @@ pub use bevy_app::{App, Update}; use bevy_core::{FrameCountPlugin, TaskPoolPlugin, TypeRegistrationPlugin}; pub use bevy_ecs::{ prelude::{Commands, Component, Entity, In, Local, Query, ResMut, Resource, World}, - system::{CommandQueue, IntoSystem}, + system::IntoSystem, + world::CommandQueue, }; use bevy_time::TimePlugin; @@ -63,13 +64,13 @@ impl TestingContext { pub fn set_flush_loop_limit(&mut self, limit: Option) { self.app - .world + .world_mut() .get_resource_or_insert_with(FlushParameters::default) .flush_loop_limit = limit; } pub fn command(&mut self, f: impl FnOnce(&mut Commands) -> U) -> U { - self.app.world.command(f) + self.app.world_mut().command(f) } /// Build a simple workflow with a single input and output, and no streams @@ -148,7 +149,7 @@ impl TestingContext { } pub fn no_unhandled_errors(&self) -> bool { - let Some(errors) = self.app.world.get_resource::() else { + let Some(errors) = self.app.world().get_resource::() else { return true; }; @@ -156,20 +157,23 @@ impl TestingContext { } pub fn get_unhandled_errors(&self) -> Option<&UnhandledErrors> { - self.app.world.get_resource::() + self.app.world().get_resource::() } // Check that all buffers in the world are empty pub fn confirm_buffers_empty(&mut self) -> Result<(), Vec> { - let mut query = self.app.world.query::<(Entity, &GetBufferedSessionsFn)>(); + let mut query = self + .app + .world_mut() + .query::<(Entity, &GetBufferedSessionsFn)>(); let buffers: Vec<_> = query - .iter(&self.app.world) + .iter(self.app.world()) .map(|(e, get_sessions)| (e, get_sessions.0)) .collect(); let mut non_empty_buffers = Vec::new(); for (e, get_sessions) in buffers { - if !get_sessions(e, &self.app.world).is_ok_and(|s| s.is_empty()) { + if !get_sessions(e, self.app.world()).is_ok_and(|s| s.is_empty()) { non_empty_buffers.push(e); } } diff --git a/src/workflow.rs b/src/workflow.rs index 29cd3581..6e521eae 100644 --- a/src/workflow.rs +++ b/src/workflow.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ prelude::{Commands, World}, - system::CommandQueue, + world::CommandQueue, }; use bevy_hierarchy::BuildChildren;