diff --git a/examples/avian_3d_character/Cargo.toml b/examples/avian_3d_character/Cargo.toml index 8bb81d8d..be730dc7 100644 --- a/examples/avian_3d_character/Cargo.toml +++ b/examples/avian_3d_character/Cargo.toml @@ -14,26 +14,26 @@ lightyear_examples_common = { path = "../common" } bevy_screen_diagnostics = "0.6" leafwing-input-manager = "0.15" avian3d = { version = "0.1.1", default-features = false, features = [ - "3d", - "f32", - "parry-f32", - "parallel", - "serialize", + "3d", + "f32", + "parry-f32", + "parallel", + "serialize", ] } lightyear = { path = "../../lightyear", features = [ - "webtransport", - "websocket", - "leafwing", - "avian3d", + "webtransport", + "websocket", + "leafwing", + "avian3d", ] } serde = { version = "1.0.188", features = ["derive"] } anyhow = { version = "1.0.75", features = [] } tracing = "0.1" tracing-subscriber = "0.3.17" bevy = { version = "0.14", features = [ - "multi_threaded", - "bevy_state", - "serialize", + "multi_threaded", + "bevy_state", + "serialize", ] } rand = "0.8.1" diff --git a/examples/avian_3d_character/assets/settings.ron b/examples/avian_3d_character/assets/settings.ron index 61828849..6a3dea07 100644 --- a/examples/avian_3d_character/assets/settings.ron +++ b/examples/avian_3d_character/assets/settings.ron @@ -1,6 +1,6 @@ MySettings( server_replication_send_interval: 50, - input_delay_ticks: 4, + input_delay_ticks: 6, // do not set a limit on the amount of prediction max_prediction_ticks: 100, correction_ticks_factor: 2.0, diff --git a/lightyear/src/client/prediction/mod.rs b/lightyear/src/client/prediction/mod.rs index e9d94622..5b816459 100644 --- a/lightyear/src/client/prediction/mod.rs +++ b/lightyear/src/client/prediction/mod.rs @@ -10,6 +10,7 @@ pub mod pre_prediction; pub mod predicted_history; pub mod prespawn; pub(crate) mod resource; +pub mod resource_history; pub mod rollback; pub mod spawn; diff --git a/lightyear/src/client/prediction/plugin.rs b/lightyear/src/client/prediction/plugin.rs index 75bc8152..d401daf6 100644 --- a/lightyear/src/client/prediction/plugin.rs +++ b/lightyear/src/client/prediction/plugin.rs @@ -1,6 +1,6 @@ use bevy::prelude::{ not, App, Component, Condition, FixedPostUpdate, IntoSystemConfigs, IntoSystemSetConfigs, - Plugin, PostUpdate, PreUpdate, Res, SystemSet, + Plugin, PostUpdate, PreUpdate, Res, Resource, SystemSet, }; use bevy::reflect::Reflect; use bevy::transform::TransformSystem; @@ -29,9 +29,10 @@ use crate::shared::sets::{ClientMarker, InternalMainSet}; use super::pre_prediction::PrePredictionPlugin; use super::predicted_history::{add_component_history, apply_confirmed_update}; +use super::resource_history::update_resource_history; use super::rollback::{ check_rollback, increment_rollback_tick, prepare_rollback, prepare_rollback_non_networked, - prepare_rollback_prespawn, run_rollback, Rollback, RollbackState, + prepare_rollback_prespawn, prepare_rollback_resource, run_rollback, Rollback, RollbackState, }; use super::spawn::spawn_predicted_entity; @@ -199,6 +200,17 @@ pub fn add_non_networked_rollback_systems(app: ); } +pub fn add_resource_rollback_systems(app: &mut App) { + app.add_systems( + PreUpdate, + prepare_rollback_resource::.in_set(PredictionSet::PrepareRollback), + ); + app.add_systems( + FixedPostUpdate, + update_resource_history::.in_set(PredictionSet::UpdateHistory), + ); +} + pub fn add_prediction_systems(app: &mut App, prediction_mode: ComponentSyncMode) { app.add_systems( PreUpdate, diff --git a/lightyear/src/client/prediction/resource_history.rs b/lightyear/src/client/prediction/resource_history.rs new file mode 100644 index 00000000..9e7ed5b9 --- /dev/null +++ b/lightyear/src/client/prediction/resource_history.rs @@ -0,0 +1,262 @@ +//! There's a lot of overlap with `client::prediction_history` because resources are components in ECS so rollback is going to look similar. +use bevy::prelude::*; + +use crate::{ + prelude::{Tick, TickManager}, + utils::ready_buffer::ReadyBuffer, +}; + +use super::rollback::Rollback; + +/// Stores a past update for a resource +#[derive(Debug, PartialEq, Clone)] +pub(crate) enum ResourceState { + /// the resource just got removed + Removed, + /// the resource got updated + Updated(R), +} + +/// To know if we need to do rollback, we need to compare the resource's history with the server's state updates +#[derive(Resource, Debug)] +pub(crate) struct ResourceHistory { + // We will only store the history for the ticks where the resource got updated + pub buffer: ReadyBuffer>, +} + +impl Default for ResourceHistory { + fn default() -> Self { + Self { + buffer: ReadyBuffer::new(), + } + } +} + +impl PartialEq for ResourceHistory { + fn eq(&self, other: &Self) -> bool { + let mut self_history: Vec<_> = self.buffer.heap.iter().collect(); + let mut other_history: Vec<_> = other.buffer.heap.iter().collect(); + self_history.sort_by_key(|item| item.key); + other_history.sort_by_key(|item| item.key); + self_history.eq(&other_history) + } +} + +impl ResourceHistory { + /// Reset the history for this resource + pub(crate) fn clear(&mut self) { + self.buffer = ReadyBuffer::new(); + } + + /// Add to the buffer that we received an update for the resource at the given tick + pub(crate) fn add_update(&mut self, tick: Tick, resource: R) { + self.buffer.push(tick, ResourceState::Updated(resource)); + } + + /// Add to the buffer that the resource got removed at the given tick + pub(crate) fn add_remove(&mut self, tick: Tick) { + self.buffer.push(tick, ResourceState::Removed); + } + + // TODO: check if this logic is necessary/correct? + /// Clear the history of values strictly older than the specified tick, + /// and return the most recent value that is older or equal to the specified tick. + /// NOTE: That value is written back into the buffer + /// + /// CAREFUL: + /// the resource history will only contain the ticks where the resource got updated, and otherwise + /// contains gaps. Therefore, we need to always leave a value in the history buffer so that we can + /// get the values for the future ticks + pub(crate) fn pop_until_tick(&mut self, tick: Tick) -> Option> { + self.buffer.pop_until(&tick).map(|(tick, state)| { + // TODO: this clone is pretty bad and avoidable. Probably switch to a sequence buffer? + self.buffer.push(tick, state.clone()); + state + }) + } +} + +/// This system handles changes and removals of resources +pub(crate) fn update_resource_history( + resource: Option>, + mut history: ResMut>, + tick_manager: Res, + rollback: Res, +) { + // tick for which we will record the history (either the current client tick or the current rollback tick) + let tick = tick_manager.tick_or_rollback_tick(rollback.as_ref()); + + if let Some(resource) = resource { + if resource.is_changed() { + history.add_update(tick, resource.clone()); + } + // resource does not exist, it might have been just removed + } else { + match history.buffer.peek_max_item() { + Some((_, ResourceState::Removed)) => (), + // if there is no latest item or the latest item isn't a removal then the resource just got removed. + _ => history.add_remove(tick), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::prelude::client::RollbackState; + use crate::prelude::AppComponentExt; + use crate::tests::stepper::BevyStepper; + use crate::utils::ready_buffer::ItemWithReadyKey; + use bevy::ecs::system::RunSystemOnce; + + #[derive(Resource, Clone, PartialEq, Debug)] + struct TestResource(f32); + + /// Test adding and removing updates to the resource history + #[test] + fn test_resource_history() { + let mut resource_history = ResourceHistory::::default(); + + // check when we try to access a value when the buffer is empty + assert_eq!(resource_history.pop_until_tick(Tick(0)), None); + + // check when we try to access an exact tick + resource_history.add_update(Tick(1), TestResource(1.0)); + resource_history.add_update(Tick(2), TestResource(2.0)); + assert_eq!( + resource_history.pop_until_tick(Tick(2)), + Some(ResourceState::Updated(TestResource(2.0))) + ); + // check that we cleared older ticks, and that the most recent value still remains + assert_eq!(resource_history.buffer.len(), 1); + assert!(resource_history.buffer.has_item(&Tick(2))); + + // check when we try to access a value in-between ticks + resource_history.add_update(Tick(4), TestResource(4.0)); + // we retrieve the most recent value older or equal to Tick(3) + assert_eq!( + resource_history.pop_until_tick(Tick(3)), + Some(ResourceState::Updated(TestResource(2.0))) + ); + assert_eq!(resource_history.buffer.len(), 2); + // check that the most recent value got added back to the buffer at the popped tick + assert_eq!( + resource_history.buffer.heap.peek(), + Some(&ItemWithReadyKey { + key: Tick(2), + item: ResourceState::Updated(TestResource(2.0)) + }) + ); + assert!(resource_history.buffer.has_item(&Tick(4))); + + // check that nothing happens when we try to access a value before any ticks + assert_eq!(resource_history.pop_until_tick(Tick(0)), None); + assert_eq!(resource_history.buffer.len(), 2); + + resource_history.add_remove(Tick(5)); + assert_eq!(resource_history.buffer.len(), 3); + + resource_history.clear(); + assert_eq!(resource_history.buffer.len(), 0); + } + + /// Test that the history gets updated correctly + /// 1. Updating the TestResource resource + /// 2. Removing the TestResource resource + /// 3. Updating the TestResource resource during rollback + /// 4. Removing the TestResource resource during rollback + #[test] + fn test_update_history() { + let mut stepper = BevyStepper::default(); + stepper.client_app.add_resource_rollback::(); + + // 1. Updating TestResource resource + stepper + .client_app + .world_mut() + .insert_resource(TestResource(1.0)); + stepper.frame_step(); + stepper + .client_app + .world_mut() + .resource_mut::() + .0 = 2.0; + stepper.frame_step(); + let tick = stepper.client_tick(); + assert_eq!( + stepper + .client_app + .world_mut() + .get_resource_mut::>() + .expect("Expected resource history to be added") + .pop_until_tick(tick), + Some(ResourceState::Updated(TestResource(2.0))), + "Expected resource value to be updated in resource history" + ); + + // 2. Removing TestResource + stepper + .client_app + .world_mut() + .remove_resource::(); + stepper.frame_step(); + let tick = stepper.client_tick(); + assert_eq!( + stepper + .client_app + .world_mut() + .get_resource_mut::>() + .expect("Expected resource history to be added") + .pop_until_tick(tick), + Some(ResourceState::Removed), + "Expected resource value to be removed in resource history" + ); + + // 3. Updating TestResource during rollback + let rollback_tick = Tick(10); + stepper + .client_app + .world_mut() + .insert_resource(Rollback::new(RollbackState::ShouldRollback { + current_tick: rollback_tick, + })); + stepper + .client_app + .world_mut() + .insert_resource(TestResource(3.0)); + stepper + .client_app + .world_mut() + .run_system_once(update_resource_history::); + assert_eq!( + stepper + .client_app + .world_mut() + .get_resource_mut::>() + .expect("Expected resource history to be added") + .pop_until_tick(rollback_tick), + Some(ResourceState::Updated(TestResource(3.0))), + "Expected resource value to be updated in resource history" + ); + + // 4. Removing TestResource during rollback + stepper + .client_app + .world_mut() + .remove_resource::(); + stepper + .client_app + .world_mut() + .run_system_once(update_resource_history::); + assert_eq!( + stepper + .client_app + .world_mut() + .get_resource_mut::>() + .expect("Expected resource history to be added") + .pop_until_tick(rollback_tick), + Some(ResourceState::Removed), + "Expected resource value to be removed from resource history" + ); + } +} diff --git a/lightyear/src/client/prediction/rollback.rs b/lightyear/src/client/prediction/rollback.rs index 415c07d0..201f276a 100644 --- a/lightyear/src/client/prediction/rollback.rs +++ b/lightyear/src/client/prediction/rollback.rs @@ -22,6 +22,7 @@ use crate::client::prediction::resource::PredictionManager; use crate::prelude::{ComponentRegistry, PreSpawnedPlayerObject, Tick, TickManager}; use super::predicted_history::PredictionHistory; +use super::resource_history::{ResourceHistory, ResourceState}; use super::Predicted; /// Resource that indicates whether we are in a rollback state or not @@ -490,7 +491,7 @@ pub(crate) fn prepare_rollback_non_networked( match history.pop_until_tick(rollback_tick) { None | Some(ComponentState::Removed) => { if component.is_some() { - debug!(?entity, ?kind, "Non-netorked component for predicted entity didn't exist at time of rollback, removing it"); + debug!(?entity, ?kind, "Non-networked component for predicted entity didn't exist at time of rollback, removing it"); // the component didn't exist at the time, remove it! commands.entity(entity).remove::(); } @@ -516,6 +517,57 @@ pub(crate) fn prepare_rollback_non_networked( } } +// Revert `resource` to its value at the tick that the incoming rollback will rollback to. +pub(crate) fn prepare_rollback_resource( + mut commands: Commands, + tick_manager: Res, + rollback: Res, + resource: Option>, + mut history: ResMut>, +) { + let kind = std::any::type_name::(); + let _span = trace_span!("client prepare rollback for resource", ?kind); + + let current_tick = tick_manager.tick(); + let Some(rollback_tick_plus_one) = rollback.get_rollback_tick() else { + error!("prepare_rollback_resource should only be called when we are in rollback"); + return; + }; + + // careful, the current_tick is already incremented by 1 in the check_rollback stage... + let rollback_tick = rollback_tick_plus_one - 1; + + // 1. restore the resource to the historical value + match history.pop_until_tick(rollback_tick) { + None | Some(ResourceState::Removed) => { + if resource.is_some() { + debug!( + ?kind, + "Resource didn't exist at time of rollback, removing it" + ); + // the resource didn't exist at the time, remove it! + commands.remove_resource::(); + } + } + Some(ResourceState::Updated(r)) => { + // the resource existed at the time, restore it! + if let Some(mut resource) = resource { + // update the resource to the corrected value + *resource = r.clone(); + } else { + debug!( + ?kind, + "Resource for predicted entity existed at time of rollback, inserting it" + ); + commands.insert_resource(r); + } + } + } + + // 2. we need to clear the history so we can write a new one + history.clear(); +} + pub(crate) fn run_rollback(world: &mut World) { let tick_manager = world.get_resource::().unwrap(); let rollback = world.get_resource::().unwrap(); diff --git a/lightyear/src/protocol/component.rs b/lightyear/src/protocol/component.rs index e4f01b42..366954bf 100644 --- a/lightyear/src/protocol/component.rs +++ b/lightyear/src/protocol/component.rs @@ -17,8 +17,9 @@ use crate::client::components::ComponentSyncMode; use crate::client::config::ClientConfig; use crate::client::interpolation::{add_interpolation_systems, add_prepare_interpolation_systems}; use crate::client::prediction::plugin::{ - add_non_networked_rollback_systems, add_prediction_systems, + add_non_networked_rollback_systems, add_prediction_systems, add_resource_rollback_systems, }; +use crate::client::prediction::resource_history::ResourceHistory; use crate::prelude::client::SyncComponent; use crate::prelude::server::ServerConfig; use crate::prelude::{ChannelDirection, Message, Tick}; @@ -913,6 +914,9 @@ pub trait AppComponentExt { /// Enable rollbacks for a component even if the component is not networked fn add_rollback(&mut self); + /// Enable rollbacks for a resource. + fn add_resource_rollback(&mut self); + /// Enable prediction systems for this component. /// You can specify the prediction [`ComponentSyncMode`] fn add_prediction(&mut self, prediction_mode: ComponentSyncMode); @@ -1104,6 +1108,14 @@ impl AppComponentExt for App { } } + fn add_resource_rollback(&mut self) { + let is_client = self.world().get_resource::().is_some(); + if is_client { + self.insert_resource(ResourceHistory::::default()); + add_resource_rollback_systems::(self); + } + } + fn add_prediction(&mut self, prediction_mode: ComponentSyncMode) { let mut registry = self.world_mut().resource_mut::(); registry.set_prediction_mode::(prediction_mode); diff --git a/lightyear/src/utils/ready_buffer.rs b/lightyear/src/utils/ready_buffer.rs index 678f53f4..ec42caa5 100644 --- a/lightyear/src/utils/ready_buffer.rs +++ b/lightyear/src/utils/ready_buffer.rs @@ -15,7 +15,7 @@ pub struct ReadyBuffer { pub heap: BinaryHeap>, } -impl Default for ReadyBuffer { +impl Default for ReadyBuffer { fn default() -> Self { Self { heap: BinaryHeap::default(), @@ -23,7 +23,7 @@ impl Default for ReadyBuffer { } } -impl ReadyBuffer { +impl ReadyBuffer { pub fn new() -> Self { Self { heap: BinaryHeap::default(), @@ -31,12 +31,18 @@ impl ReadyBuffer { } } -impl ReadyBuffer { +impl ReadyBuffer { /// Adds an item to the heap marked by time pub fn push(&mut self, key: K, item: T) { self.heap.push(ItemWithReadyKey { key, item }); } + /// Returns a reference to the item with the highest `K` value or None if the queue is empty. + /// Does not remove the item from the queue. + pub fn peek_max_item(&self) -> Option<(K, &T)> { + self.heap.peek().map(|item| (item.key.clone(), &item.item)) + } + /// Returns whether or not there is an item with a key more recent or equal to `current_key` /// that is ready to be returned (i.e. we are beyond the instant associated with the item) pub fn has_item(&self, current_key: &K) -> bool { @@ -160,15 +166,15 @@ pub struct ItemWithReadyKey { pub item: T, } -impl Eq for ItemWithReadyKey {} +impl Eq for ItemWithReadyKey {} -impl PartialEq for ItemWithReadyKey { +impl PartialEq for ItemWithReadyKey { fn eq(&self, other: &Self) -> bool { - self.item == other.item && self.key == other.key + self.key == other.key } } -impl PartialOrd for ItemWithReadyKey { +impl PartialOrd for ItemWithReadyKey { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } @@ -176,7 +182,7 @@ impl PartialOrd for ItemWithReadyKey { /// BinaryHeap is a max-heap, so we must reverse the ordering of the Instants /// to get a min-heap -impl Ord for ItemWithReadyKey { +impl Ord for ItemWithReadyKey { fn cmp(&self, other: &ItemWithReadyKey) -> Ordering { other.key.cmp(&self.key) } @@ -192,6 +198,30 @@ mod tests { use super::*; + #[test] + fn test_peek_max_item() { + let mut heap = ReadyBuffer::::new(); + + // no item in the heap means no max item + assert_eq!(heap.peek_max_item(), None); + + // heap with one item should return that item + heap.push(Tick(1), 38); + matches!(heap.peek_max_item(), Some((Tick(1), 38))); + + // heap's max item should change to new item since it is greater than the current items + heap.push(Tick(3), 24); + matches!(heap.peek_max_item(), Some((Tick(3), 24))); + + // the heap's max item is still Tick(3) after inserting a smaller item + heap.push(Tick(2), 64); + matches!(heap.peek_max_item(), Some((Tick(3), 24))); + + // remove the old max item and confirm the second max item is now the max item + heap.pop_item(&Tick(3)); + matches!(heap.peek_max_item(), Some((Tick(2), 64))); + } + #[test] fn test_time_heap() { let mut heap = ReadyBuffer::::new();