diff --git a/runtime/swimos_rocks_store/src/iterator.rs b/runtime/swimos_rocks_store/src/iterator.rs deleted file mode 100644 index f2a1b3e4f..000000000 --- a/runtime/swimos_rocks_store/src/iterator.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2015-2024 Swim Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// todo: add iterator direction -#[derive(Default)] -pub struct EngineIterOpts; diff --git a/runtime/swimos_rocks_store/src/lib.rs b/runtime/swimos_rocks_store/src/lib.rs index 1fda933c2..4e2a9dfed 100644 --- a/runtime/swimos_rocks_store/src/lib.rs +++ b/runtime/swimos_rocks_store/src/lib.rs @@ -19,7 +19,6 @@ mod agent; mod engine; -mod iterator; mod keyspaces; #[cfg(test)] mod nostore; diff --git a/server/swimos_agent/src/lanes/map/mod.rs b/server/swimos_agent/src/lanes/map/mod.rs index d0db14bbd..870a4bddf 100644 --- a/server/swimos_agent/src/lanes/map/mod.rs +++ b/server/swimos_agent/src/lanes/map/mod.rs @@ -13,19 +13,19 @@ // limitations under the License. use bytes::BytesMut; -use frunk::{Coprod, Coproduct}; +use frunk::Coprod; use static_assertions::assert_impl_all; use std::{ any::type_name, borrow::Borrow, cell::RefCell, - collections::HashMap, + collections::{HashMap, VecDeque}, fmt::{Debug, Formatter}, hash::Hash, marker::PhantomData, }; use swimos_agent_protocol::{encoding::lane::MapLaneResponseEncoder, MapMessage}; -use swimos_form::{read::RecognizerReadable, write::StructuralWritable}; +use swimos_form::{read::RecognizerReadable, write::StructuralWritable, Form}; use swimos_recon::parser::RecognizerDecoder; use tokio_util::codec::{Decoder, Encoder}; use uuid::Uuid; @@ -582,6 +582,7 @@ type MapLaneHandler = Coprod!( MapLaneUpdate, MapLaneRemove, MapLaneClear, + MapLaneDropOrTake, ); impl HandlerTrans> for ProjTransform> { @@ -591,16 +592,17 @@ impl HandlerTrans> for ProjTransform> let ProjTransform { projection } = self; match input { MapMessage::Update { key, value } => { - Coproduct::Inl(MapLaneUpdate::new(projection, key, value)) + MapLaneHandler::inject(MapLaneUpdate::new(projection, key, value)) } MapMessage::Remove { key } => { - Coproduct::Inr(Coproduct::Inl(MapLaneRemove::new(projection, key))) + MapLaneHandler::inject(MapLaneRemove::new(projection, key)) } - MapMessage::Clear => Coproduct::Inr(Coproduct::Inr(Coproduct::Inl(MapLaneClear::new( - projection, - )))), - _ => { - todo!("Drop and take not yet implemented.") + MapMessage::Clear => MapLaneHandler::inject(MapLaneClear::new(projection)), + MapMessage::Drop(n) => { + MapLaneHandler::inject(MapLaneDropOrTake::new(projection, DropOrTake::Drop, n)) + } + MapMessage::Take(n) => { + MapLaneHandler::inject(MapLaneDropOrTake::new(projection, DropOrTake::Take, n)) } } } @@ -705,7 +707,7 @@ pub fn decode_and_apply( ) -> DecodeAndApply where C: AgentDescription, - K: Clone + Eq + Hash + RecognizerReadable, + K: Form + Clone + Eq + Hash, V: RecognizerReadable, { let decode: DecodeMapMessage = DecodeMapMessage::new(message); @@ -1133,6 +1135,7 @@ pub enum DecodeAndSelectApply { Updating(MapLaneSelectUpdate), Removing(MapLaneSelectRemove), Clearing(MapLaneSelectClear), + DroppingOrTaking(MapLaneSelectDropOrTake), #[default] Done, } @@ -1140,7 +1143,7 @@ pub enum DecodeAndSelectApply { impl HandlerAction for DecodeAndSelectApply where C: AgentDescription, - K: RecognizerReadable + Clone + Eq + Hash, + K: Form + Clone + Eq + Hash, V: RecognizerReadable, F: SelectorFn>, { @@ -1180,33 +1183,46 @@ where selector, )); } - _ => { - todo!("Drop and take not yet implemented.") + MapMessage::Drop(n) => { + *self = DecodeAndSelectApply::DroppingOrTaking( + MapLaneSelectDropOrTake::new(selector, DropOrTake::Drop, n), + ); + } + MapMessage::Take(n) => { + *self = DecodeAndSelectApply::DroppingOrTaking( + MapLaneSelectDropOrTake::new(selector, DropOrTake::Take, n), + ); } } - StepResult::Continue { modified_item } } } } DecodeAndSelectApply::Updating(mut selector) => { let result = selector.step(action_context, meta, context); - if !result.is_cont() { - *self = DecodeAndSelectApply::Done; + if result.is_cont() { + *self = DecodeAndSelectApply::Updating(selector); } result } DecodeAndSelectApply::Removing(mut selector) => { let result = selector.step(action_context, meta, context); - if !result.is_cont() { - *self = DecodeAndSelectApply::Done; + if result.is_cont() { + *self = DecodeAndSelectApply::Removing(selector); } result } DecodeAndSelectApply::Clearing(mut selector) => { let result = selector.step(action_context, meta, context); - if !result.is_cont() { - *self = DecodeAndSelectApply::Done; + if result.is_cont() { + *self = DecodeAndSelectApply::Clearing(selector); + } + result + } + DecodeAndSelectApply::DroppingOrTaking(mut selector) => { + let result = selector.step(action_context, meta, context); + if result.is_cont() { + *self = DecodeAndSelectApply::DroppingOrTaking(selector); } result } @@ -1237,6 +1253,11 @@ where .field("state", &"Clearing") .field("selector", &Described::new(context, selector)) .finish(), + DecodeAndSelectApply::DroppingOrTaking(selector) => f + .debug_struct("DecodeAndSelectApply") + .field("state", &"DroppingOrTaking") + .field("selector", &Described::new(context, selector)) + .finish(), DecodeAndSelectApply::Done => f .debug_tuple("DecodeAndSelectSet") .field(&"<>") @@ -1325,3 +1346,364 @@ where } } } + +struct MapLaneRemoveMultiple { + projection: for<'a> fn(&'a C) -> &'a MapLane, + keys: Option>, + current: Option>, +} + +impl MapLaneRemoveMultiple { + fn new(projection: for<'a> fn(&'a C) -> &'a MapLane, keys: VecDeque) -> Self { + MapLaneRemoveMultiple { + projection, + keys: Some(keys), + current: None, + } + } +} + +impl HandlerAction for MapLaneRemoveMultiple +where + C: AgentDescription, + K: Clone + Eq + Hash, +{ + type Completion = (); + + fn step( + &mut self, + action_context: &mut ActionContext, + meta: AgentMetadata, + context: &C, + ) -> StepResult { + let MapLaneRemoveMultiple { + projection, + keys, + current, + } = self; + if let Some(key_queue) = keys { + let h = if let Some(h) = current { + h + } else if let Some(next) = key_queue.pop_front() { + current.insert(MapLaneRemove::new(*projection, next)) + } else { + *keys = None; + return StepResult::done(()); + }; + match h.step(action_context, meta, context) { + StepResult::Continue { modified_item } => StepResult::Continue { modified_item }, + StepResult::Fail(err) => StepResult::Fail(err), + StepResult::Complete { modified_item, .. } => { + *current = None; + StepResult::Continue { modified_item } + } + } + } else { + StepResult::after_done() + } + } + + fn describe(&self, context: &C, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + let MapLaneRemoveMultiple { + projection, keys, .. + } = self; + let lane = projection(context); + let name = context.item_name(lane.id()); + f.debug_struct("MapLaneRemoveMultiple") + .field("id", &lane.id()) + .field("lane_name", &name.as_ref().map(|s| s.as_ref())) + .field("keys", &keys.as_ref().map(|v| v.len())) + .field("consumed", &keys.is_none()) + .finish() + } +} + +#[derive(Debug, Clone, Copy)] +enum DropOrTake { + Drop, + Take, +} + +enum DropOrTakeState { + Init, + Removing(MapLaneRemoveMultiple), +} + +/// An [event handler](crate::event_handler::EventHandler)`] that will either retain or drop the first `n` elements +/// from a map (ordering the keys by the ordering of their Recon model representations). +pub struct MapLaneDropOrTake { + projection: for<'a> fn(&'a C) -> &'a MapLane, + kind: DropOrTake, + number: u64, + state: DropOrTakeState, +} + +impl MapLaneDropOrTake { + fn new( + projection: for<'a> fn(&'a C) -> &'a MapLane, + kind: DropOrTake, + number: u64, + ) -> Self { + MapLaneDropOrTake { + projection, + kind, + number, + state: DropOrTakeState::Init, + } + } +} + +fn drop_or_take(map: &HashMap, kind: DropOrTake, number: usize) -> VecDeque +where + K: StructuralWritable + Clone + Eq + Hash, +{ + let mut keys_with_recon = map.keys().map(|k| (k.structure(), k)).collect::>(); + keys_with_recon.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + let mut to_remove: VecDeque = VecDeque::new(); + match kind { + DropOrTake::Drop => { + to_remove.extend( + keys_with_recon + .into_iter() + .take(number) + .map(|(_, k1)| k1.clone()), + ); + } + DropOrTake::Take => { + to_remove.extend( + keys_with_recon + .into_iter() + .skip(number) + .map(|(_, k1)| k1.clone()), + ); + } + } + to_remove +} + +impl HandlerAction for MapLaneDropOrTake +where + C: AgentDescription, + K: StructuralWritable + Clone + Eq + Hash, +{ + type Completion = (); + + fn step( + &mut self, + action_context: &mut ActionContext, + meta: AgentMetadata, + context: &C, + ) -> StepResult { + let MapLaneDropOrTake { + projection, + kind, + number, + state, + } = self; + match state { + DropOrTakeState::Init => { + let n = match usize::try_from(*number) { + Ok(n) => n, + Err(err) => { + return StepResult::Fail(EventHandlerError::EffectError(Box::new(err))) + } + }; + let lane = projection(context); + let to_remove = lane.get_map(|map| drop_or_take(map, *kind, n)); + let mut handler = MapLaneRemoveMultiple::new(*projection, to_remove); + let result = handler.step(action_context, meta, context); + *state = DropOrTakeState::Removing(handler); + result + } + DropOrTakeState::Removing(h) => h.step(action_context, meta, context), + } + } + + fn describe(&self, context: &C, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + let MapLaneDropOrTake { + projection, + kind, + number, + state, + } = self; + let mut dbg = f.debug_struct("MapLaneRemoveMultiple"); + dbg.field("kind", kind).field("number", number); + match state { + DropOrTakeState::Init => { + let lane = projection(context); + let name = context.item_name(lane.id()); + dbg.field("id", &lane.id()) + .field("lane_name", &name.as_ref().map(|s| s.as_ref())) + .field("state", &"Init") + .finish() + } + DropOrTakeState::Removing(h) => dbg + .field("state", &"Removing") + .field("inner", &Described::new(context, h)) + .finish(), + } + } +} + +struct MapLaneSelectRemoveMultiple { + _type: PhantomData, + projection: F, + keys: Option>, +} + +impl MapLaneSelectRemoveMultiple { + fn new(projection: F, keys: VecDeque) -> Self { + MapLaneSelectRemoveMultiple { + _type: PhantomData, + projection, + keys: Some(keys), + } + } +} + +impl HandlerAction for MapLaneSelectRemoveMultiple +where + K: Clone + Eq + Hash, + F: SelectorFn>, +{ + type Completion = (); + + fn step( + &mut self, + _action_context: &mut ActionContext, + _meta: AgentMetadata, + context: &C, + ) -> StepResult { + let MapLaneSelectRemoveMultiple { + projection, keys, .. + } = self; + if let Some(key_queue) = keys { + if let Some(next) = key_queue.pop_front() { + let selector = projection.selector(context); + if let Some(lane) = selector.select() { + lane.remove(&next); + StepResult::Continue { + modified_item: Some(Modification::of(lane.id)), + } + } else { + StepResult::Fail(EventHandlerError::LaneNotFound(selector.name().to_owned())) + } + } else { + *keys = None; + StepResult::done(()) + } + } else { + StepResult::after_done() + } + } + + fn describe(&self, _context: &C, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + let MapLaneSelectRemoveMultiple { + projection, keys, .. + } = self; + f.debug_struct("MapLaneSelectRemoveMultiple") + .field("lane_name", &projection.name()) + .field("keys", &keys.as_ref().map(|v| v.len())) + .field("consumed", &keys.is_none()) + .finish() + } +} + +#[derive(Default)] +enum SelectDropOrTakeState { + Init(F), + Removing(MapLaneSelectRemoveMultiple), + #[default] + Done, +} + +/// An [event handler](crate::event_handler::EventHandler) that attempts to drop or retain the first or last `n` elements of +/// a map lane, if that lane exists. +pub struct MapLaneSelectDropOrTake { + kind: DropOrTake, + number: u64, + state: SelectDropOrTakeState, +} + +impl MapLaneSelectDropOrTake { + fn new(projection: F, kind: DropOrTake, number: u64) -> Self { + MapLaneSelectDropOrTake { + kind, + number, + state: SelectDropOrTakeState::Init(projection), + } + } +} + +impl HandlerAction for MapLaneSelectDropOrTake +where + C: AgentDescription, + K: StructuralWritable + Clone + Eq + Hash, + F: SelectorFn>, +{ + type Completion = (); + + fn step( + &mut self, + action_context: &mut ActionContext, + meta: AgentMetadata, + context: &C, + ) -> StepResult { + let MapLaneSelectDropOrTake { + kind, + number, + state, + } = self; + match std::mem::take(state) { + SelectDropOrTakeState::Init(projection) => { + let n = match usize::try_from(*number) { + Ok(n) => n, + Err(err) => { + return StepResult::Fail(EventHandlerError::EffectError(Box::new(err))) + } + }; + let selector = projection.selector(context); + let to_remove = if let Some(lane) = selector.select() { + lane.get_map(|map| drop_or_take(map, *kind, n)) + } else { + return StepResult::Fail(EventHandlerError::LaneNotFound( + selector.name().to_owned(), + )); + }; + drop(selector); + let mut handler = MapLaneSelectRemoveMultiple::new(projection, to_remove); + let result = handler.step(action_context, meta, context); + if result.is_cont() { + *state = SelectDropOrTakeState::Removing(handler); + } + result + } + SelectDropOrTakeState::Removing(mut h) => { + let result = h.step(action_context, meta, context); + if result.is_cont() { + *state = SelectDropOrTakeState::Removing(h); + } + result + } + SelectDropOrTakeState::Done => StepResult::after_done(), + } + } + + fn describe(&self, context: &C, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + let MapLaneSelectDropOrTake { + kind, + number, + state, + } = self; + let mut dbg = f.debug_struct("MapLaneRemoveMultiple"); + dbg.field("kind", kind).field("number", number); + match state { + SelectDropOrTakeState::Init(_) => dbg.field("state", &"Init").finish(), + SelectDropOrTakeState::Removing(h) => dbg + .field("state", &"Removing") + .field("inner", &Described::new(context, h)) + .finish(), + SelectDropOrTakeState::Done => dbg.field("state", &"Done").finish(), + } + } +} diff --git a/server/swimos_agent/src/lanes/map/tests.rs b/server/swimos_agent/src/lanes/map/tests.rs index bc72a3a97..12947d368 100644 --- a/server/swimos_agent/src/lanes/map/tests.rs +++ b/server/swimos_agent/src/lanes/map/tests.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; +use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; -use std::{borrow::Cow, collections::HashMap}; use bytes::BytesMut; use swimos_agent_protocol::{ @@ -25,6 +26,8 @@ use swimos_utilities::routing::RouteUri; use tokio_util::codec::Decoder; use uuid::Uuid; +use crate::event_handler::ModificationFlags; +use crate::lanes::map::MapLaneSelectDropOrTake; use crate::{ agent_model::{AgentDescription, WriteResult}, event_handler::{EventHandlerError, HandlerAction, Modification, StepResult}, @@ -41,7 +44,7 @@ use crate::{ test_context::dummy_context, }; -use super::MapLaneWithEntry; +use super::{drop_or_take, DropOrTake, MapLaneDropOrTake, MapLaneWithEntry}; const ID: u64 = 74; @@ -857,7 +860,7 @@ struct TestSelectorFn(bool); impl SelectorFn for TestSelectorFn { type Target = MapLane; - fn selector(self, context: &TestAgent) -> impl Selector + '_ { + fn selector<'a>(&'a self, context: &'a TestAgent) -> impl Selector + 'a { TestSelector(context, self.0) } @@ -1111,3 +1114,287 @@ fn map_lane_select_sync_event_handler_missing() { panic!("Lane not found error expected."); } } + +#[test] +fn drop_take_choose_keys() { + let map = [(1, 2), (2, 4), (3, 6), (4, 8)] + .into_iter() + .collect::>(); + + assert_eq!( + drop_or_take(&map, super::DropOrTake::Drop, 2), + VecDeque::from(vec![1, 2]) + ); + assert_eq!( + drop_or_take(&map, super::DropOrTake::Take, 3), + VecDeque::from(vec![4]) + ); + assert_eq!( + drop_or_take(&map, super::DropOrTake::Drop, 4), + VecDeque::from(vec![1, 2, 3, 4]) + ); + assert_eq!( + drop_or_take(&map, super::DropOrTake::Take, 4), + VecDeque::from(vec![]) + ); + assert_eq!( + drop_or_take(&map, super::DropOrTake::Drop, 10), + VecDeque::from(vec![1, 2, 3, 4]) + ); + assert_eq!( + drop_or_take(&map, super::DropOrTake::Take, 10), + VecDeque::from(vec![]) + ); + assert_eq!( + drop_or_take(&map, super::DropOrTake::Drop, 0), + VecDeque::from(vec![]) + ); + assert_eq!( + drop_or_take(&map, super::DropOrTake::Take, 0), + VecDeque::from(vec![1, 2, 3, 4]) + ); +} + +#[test] +fn map_lane_drop_event_handler() { + let uri = make_uri(); + let route_params = HashMap::new(); + let meta = make_meta(&uri, &route_params); + let agent = TestAgent::with_init(); + + let mut handler = MapLaneDropOrTake::new(TestAgent::LANE, DropOrTake::Drop, 2); + + let mut removals = vec![]; + loop { + match handler.step( + &mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()), + meta, + &agent, + ) { + StepResult::Continue { modified_item } => { + let Modification { item_id, flags } = modified_item.expect("No change made."); + assert_eq!(item_id, LANE_ID); + assert_eq!(flags, ModificationFlags::all()); + agent.lane.read_with_prev(|event, _| { + if let Some(MapLaneEvent::Remove(k, _)) = event { + removals.push(k); + } else { + panic!("Expected only removals."); + } + }) + } + StepResult::Fail(err) => panic!("Failed: {}", err), + StepResult::Complete { modified_item, .. } => { + if let Some(Modification { item_id, flags }) = modified_item { + assert_eq!(item_id, LANE_ID); + assert_eq!(flags, ModificationFlags::all()); + agent.lane.read_with_prev(|event, _| { + if let Some(MapLaneEvent::Remove(k, _)) = event { + removals.push(k); + } else { + panic!("Expected only removals."); + } + }) + } + break; + } + } + } + + let result = handler.step( + &mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()), + meta, + &agent, + ); + assert!(matches!( + result, + StepResult::Fail(EventHandlerError::SteppedAfterComplete) + )); + + assert_eq!(removals.len(), 2); + assert!(removals.contains(&K1)); + assert!(removals.contains(&K3)); +} + +#[test] +fn map_lane_take_event_handler() { + let uri = make_uri(); + let route_params = HashMap::new(); + let meta = make_meta(&uri, &route_params); + let agent = TestAgent::with_init(); + + let mut handler = MapLaneDropOrTake::new(TestAgent::LANE, DropOrTake::Take, 1); + + let mut removals = vec![]; + loop { + match handler.step( + &mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()), + meta, + &agent, + ) { + StepResult::Continue { modified_item } => { + let Modification { item_id, flags } = modified_item.expect("No change made."); + assert_eq!(item_id, LANE_ID); + assert_eq!(flags, ModificationFlags::all()); + agent.lane.read_with_prev(|event, _| { + if let Some(MapLaneEvent::Remove(k, _)) = event { + removals.push(k); + } else { + panic!("Expected only removals."); + } + }) + } + StepResult::Fail(err) => panic!("Failed: {}", err), + StepResult::Complete { modified_item, .. } => { + if let Some(Modification { item_id, flags }) = modified_item { + assert_eq!(item_id, LANE_ID); + assert_eq!(flags, ModificationFlags::all()); + agent.lane.read_with_prev(|event, _| { + if let Some(MapLaneEvent::Remove(k, _)) = event { + removals.push(k); + } else { + panic!("Expected only removals."); + } + }) + } + break; + } + } + } + + let result = handler.step( + &mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()), + meta, + &agent, + ); + assert!(matches!( + result, + StepResult::Fail(EventHandlerError::SteppedAfterComplete) + )); + + assert_eq!(removals.len(), 2); + assert!(removals.contains(&K1)); + assert!(removals.contains(&K2)); +} + +#[test] +fn map_lane_select_drop_event_handler() { + let uri = make_uri(); + let route_params = HashMap::new(); + let meta = make_meta(&uri, &route_params); + let agent = TestAgent::with_init(); + + let mut handler = MapLaneSelectDropOrTake::new(TestSelectorFn(true), DropOrTake::Drop, 2); + + let mut removals = vec![]; + loop { + match handler.step( + &mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()), + meta, + &agent, + ) { + StepResult::Continue { modified_item } => { + let Modification { item_id, flags } = modified_item.expect("No change made."); + assert_eq!(item_id, LANE_ID); + assert_eq!(flags, ModificationFlags::all()); + agent.lane.read_with_prev(|event, _| { + if let Some(MapLaneEvent::Remove(k, _)) = event { + removals.push(k); + } else { + panic!("Expected only removals."); + } + }) + } + StepResult::Fail(err) => panic!("Failed: {}", err), + StepResult::Complete { modified_item, .. } => { + if let Some(Modification { item_id, flags }) = modified_item { + assert_eq!(item_id, LANE_ID); + assert_eq!(flags, ModificationFlags::all()); + agent.lane.read_with_prev(|event, _| { + if let Some(MapLaneEvent::Remove(k, _)) = event { + removals.push(k); + } else { + panic!("Expected only removals."); + } + }) + } + break; + } + } + } + + let result = handler.step( + &mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()), + meta, + &agent, + ); + assert!(matches!( + result, + StepResult::Fail(EventHandlerError::SteppedAfterComplete) + )); + + assert_eq!(removals.len(), 2); + assert!(removals.contains(&K1)); + assert!(removals.contains(&K3)); +} + +#[test] +fn map_lane_select_take_event_handler() { + let uri = make_uri(); + let route_params = HashMap::new(); + let meta = make_meta(&uri, &route_params); + let agent = TestAgent::with_init(); + + let mut handler = MapLaneSelectDropOrTake::new(TestSelectorFn(true), DropOrTake::Take, 1); + + let mut removals = vec![]; + loop { + match handler.step( + &mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()), + meta, + &agent, + ) { + StepResult::Continue { modified_item } => { + let Modification { item_id, flags } = modified_item.expect("No change made."); + assert_eq!(item_id, LANE_ID); + assert_eq!(flags, ModificationFlags::all()); + agent.lane.read_with_prev(|event, _| { + if let Some(MapLaneEvent::Remove(k, _)) = event { + removals.push(k); + } else { + panic!("Expected only removals."); + } + }) + } + StepResult::Fail(err) => panic!("Failed: {}", err), + StepResult::Complete { modified_item, .. } => { + if let Some(Modification { item_id, flags }) = modified_item { + assert_eq!(item_id, LANE_ID); + assert_eq!(flags, ModificationFlags::all()); + agent.lane.read_with_prev(|event, _| { + if let Some(MapLaneEvent::Remove(k, _)) = event { + removals.push(k); + } else { + panic!("Expected only removals."); + } + }) + } + break; + } + } + } + + let result = handler.step( + &mut dummy_context(&mut HashMap::new(), &mut BytesMut::new()), + meta, + &agent, + ); + assert!(matches!( + result, + StepResult::Fail(EventHandlerError::SteppedAfterComplete) + )); + + assert_eq!(removals.len(), 2); + assert!(removals.contains(&K1)); + assert!(removals.contains(&K2)); +} diff --git a/server/swimos_agent/src/lanes/mod.rs b/server/swimos_agent/src/lanes/mod.rs index dc1462a52..1ed89fd65 100644 --- a/server/swimos_agent/src/lanes/mod.rs +++ b/server/swimos_agent/src/lanes/mod.rs @@ -109,7 +109,7 @@ pub trait SelectorFn { /// /// #Arguments /// * `context` - The context form the the [`Selector`] will attempt to select its component. - fn selector(self, context: &C) -> impl Selector + '_; + fn selector<'a>(&'a self, context: &'a C) -> impl Selector + 'a; } /// An [event handler](crate::event_handler::EventHandler) that attempts to open a new lane for the diff --git a/server/swimos_agent/src/lanes/value/tests.rs b/server/swimos_agent/src/lanes/value/tests.rs index 39ae1bb48..4cb642296 100644 --- a/server/swimos_agent/src/lanes/value/tests.rs +++ b/server/swimos_agent/src/lanes/value/tests.rs @@ -483,7 +483,7 @@ struct TestSelectorFn(bool); impl SelectorFn for TestSelectorFn { type Target = ValueLane; - fn selector(self, context: &TestAgent) -> impl Selector + '_ { + fn selector<'a>(&'a self, context: &'a TestAgent) -> impl Selector + 'a { TestSelector(context, self.0) } diff --git a/server/swimos_connector/src/generic/mod.rs b/server/swimos_connector/src/generic/mod.rs index 66358294e..685388cd5 100644 --- a/server/swimos_connector/src/generic/mod.rs +++ b/server/swimos_connector/src/generic/mod.rs @@ -322,11 +322,11 @@ impl AgentSpec for ConnectorAgent { struct LaneSelector<'a, L> { map: Ref<'a, HashMap>, - name: String, + name: &'a str, } impl<'a, L> LaneSelector<'a, L> { - fn new(map: Ref<'a, HashMap>, name: String) -> Self { + fn new(map: Ref<'a, HashMap>, name: &'a str) -> Self { LaneSelector { map, name } } } @@ -336,11 +336,11 @@ impl<'a, L> Selector for LaneSelector<'a, L> { fn select(&self) -> Option<&Self::Target> { let LaneSelector { map, name } = self; - map.get(name.as_str()) + map.get(*name) } fn name(&self) -> &str { - self.name.as_str() + self.name } } @@ -365,10 +365,13 @@ impl SelectorFn for ValueLaneSelectorFn { &self.name } - fn selector(self, context: &ConnectorAgent) -> impl Selector + '_ { + fn selector<'a>( + &'a self, + context: &'a ConnectorAgent, + ) -> impl Selector + 'a { let ConnectorAgent { value_lanes, .. } = context; let map = value_lanes.borrow(); - LaneSelector::new(map, self.name) + LaneSelector::new(map, &self.name) } } @@ -389,10 +392,13 @@ impl MapLaneSelectorFn { impl SelectorFn for MapLaneSelectorFn { type Target = GenericMapLane; - fn selector(self, context: &ConnectorAgent) -> impl Selector + '_ { + fn selector<'a>( + &'a self, + context: &'a ConnectorAgent, + ) -> impl Selector + 'a { let ConnectorAgent { map_lanes, .. } = context; let map = map_lanes.borrow(); - LaneSelector::new(map, self.name) + LaneSelector::new(map, &self.name) } fn name(&self) -> &str {