Skip to content

Commit

Permalink
feat(listener): poll a port multiple times until either "max_poll" or…
Browse files Browse the repository at this point in the history
… returned "None" (#78)

* feat(listener): poll a port multiple times until either "max_poll" or returned "None"

fixes #71

* feat(EventListenerCfg): add function "port_1" to add a Port directly

to apply custom options to a port

---------

Co-authored-by: veeso <[email protected]>
  • Loading branch information
hasezoey and veeso authored Oct 13, 2024
1 parent 3d7325f commit 3c8e556
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 45 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ Released on 13/10/2024
- Bump `ratatui` version to `0.28`
- Dont enable `MouseCapture` by default
- Add function `enable_mouse_capture` and `disable_mouse_capture` to `TerminalBridge`
- **Max poll for ports**:
- Add `Port::set_max_poll` to set the amount a `Port` is polled in a single `Port::should_poll`.
- Add `EventListenerCfg::port` to add a manually constructed `Port`
- Previous `EventListenerCfg::port` has been renamed to `EventListenerCfg::add_port`

Huge thanks to [hasezoey](https://github.com/hasezoey) for the contributions.

Expand Down
2 changes: 1 addition & 1 deletion examples/demo/app/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ where

let mut app: Application<Id, Msg, NoUserEvent> = Application::init(
EventListenerCfg::default()
.crossterm_input_listener(Duration::from_millis(20))
.crossterm_input_listener(Duration::from_millis(20), 3)
.poll_timeout(Duration::from_millis(10))
.tick_interval(Duration::from_secs(1)),
);
Expand Down
5 changes: 3 additions & 2 deletions examples/user_events/user_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ impl Poll<UserEvent> for UserDataPort {

fn main() {
let event_listener = EventListenerCfg::default()
.crossterm_input_listener(Duration::from_millis(10))
.port(
.crossterm_input_listener(Duration::from_millis(10), 3)
.add_port(
Box::new(UserDataPort::default()),
Duration::from_millis(1000),
1,
);

let mut app: Application<Id, Msg, UserEvent> = Application::init(event_listener);
Expand Down
3 changes: 2 additions & 1 deletion src/core/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1035,9 +1035,10 @@ mod test {
}

fn listener_config() -> EventListenerCfg<MockEvent> {
EventListenerCfg::default().port(
EventListenerCfg::default().add_port(
Box::new(MockPoll::<MockEvent>::default()),
Duration::from_millis(100),
1,
)
}

Expand Down
52 changes: 41 additions & 11 deletions src/listener/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,45 @@ where
self
}

/// Add a new Port (Poll, Interval) to the the event listener
pub fn port(mut self, poll: Box<dyn Poll<U>>, interval: Duration) -> Self {
self.ports.push(Port::new(poll, interval));
/// Add a new [`Port`] (Poll, Interval) to the the event listener.
///
/// The interval is the amount of time between each [`Poll::poll`] call.
/// The max_poll is the maximum amount of times the port should be polled in a single poll.
pub fn add_port(self, poll: Box<dyn Poll<U>>, interval: Duration, max_poll: usize) -> Self {
self.port(Port::new(poll, interval, max_poll))
}

/// Add a new [`Port`] to the the event listener
///
/// The [`Port`] needs to be manually constructed, unlike [`Self::add_port`]
pub fn port(mut self, port: Port<U>) -> Self {
self.ports.push(port);
self
}

#[cfg(feature = "crossterm")]
/// Add to the event listener the default crossterm input listener [`crate::terminal::CrosstermInputListener`]
pub fn crossterm_input_listener(self, interval: Duration) -> Self {
self.port(
///
/// The interval is the amount of time between each [`Poll::poll`] call.
/// The max_poll is the maximum amount of times the port should be polled in a single poll.
pub fn crossterm_input_listener(self, interval: Duration, max_poll: usize) -> Self {
self.add_port(
Box::new(crate::terminal::CrosstermInputListener::<U>::new(interval)),
interval,
max_poll,
)
}

#[cfg(feature = "termion")]
/// Add to the event listener the default termion input listener [`crate::terminal::TermionInputListener`]
pub fn termion_input_listener(self, interval: Duration) -> Self {
self.port(
///
/// The interval is the amount of time between each [`Poll::poll`] call.
/// The max_poll is the maximum amount of times the port should be polled in a single poll.
pub fn termion_input_listener(self, interval: Duration, max_poll: usize) -> Self {
self.add_port(
Box::new(crate::terminal::TermionInputListener::<U>::new(interval)),
interval,
max_poll,
)
}
}
Expand All @@ -104,8 +122,8 @@ mod test {
let builder = builder.poll_timeout(Duration::from_millis(50));
assert_eq!(builder.poll_timeout, Duration::from_millis(50));
let builder = builder
.crossterm_input_listener(Duration::from_millis(200))
.port(Box::new(MockPoll::default()), Duration::from_secs(300));
.crossterm_input_listener(Duration::from_millis(200), 1)
.add_port(Box::new(MockPoll::default()), Duration::from_secs(300), 1);
assert_eq!(builder.ports.len(), 2);
let mut listener = builder.start();
assert!(listener.stop().is_ok());
Expand All @@ -123,8 +141,8 @@ mod test {
let builder = builder.poll_timeout(Duration::from_millis(50));
assert_eq!(builder.poll_timeout, Duration::from_millis(50));
let builder = builder
.termion_input_listener(Duration::from_millis(200))
.port(Box::new(MockPoll::default()), Duration::from_secs(300));
.termion_input_listener(Duration::from_millis(200), 1)
.add_port(Box::new(MockPoll::default()), Duration::from_secs(300), 1);
assert_eq!(builder.ports.len(), 2);
let mut listener = builder.start();
assert!(listener.stop().is_ok());
Expand All @@ -137,4 +155,16 @@ mod test {
.poll_timeout(Duration::from_secs(0))
.start();
}

#[test]
fn should_add_port_via_port_1() {
let builder = EventListenerCfg::<MockEvent>::default();
assert!(builder.ports.is_empty());
let builder = builder.port(Port::new(
Box::new(MockPoll::default()),
Duration::from_millis(1),
1,
));
assert_eq!(builder.ports.len(), 1);
}
}
1 change: 1 addition & 0 deletions src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ mod test {
vec![Port::new(
Box::new(MockPoll::default()),
Duration::from_secs(10),
1,
)],
Duration::from_millis(10),
Some(Duration::from_secs(3)),
Expand Down
19 changes: 16 additions & 3 deletions src/listener/port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,34 @@ where
poll: Box<dyn Poll<U>>,
interval: Duration,
next_poll: Instant,
max_poll: usize,
}

impl<U> Port<U>
where
U: Eq + PartialEq + Clone + PartialOrd + Send + 'static,
{
/// Define a new `Port`
pub fn new(poll: Box<dyn Poll<U>>, interval: Duration) -> Self {
/// Define a new [`Port`]
///
/// # Parameters
///
/// * `poll` - The poll trait object
/// * `interval` - The interval between each poll
/// * `max_poll` - The maximum amount of times the port should be polled in a single poll
pub fn new(poll: Box<dyn Poll<U>>, interval: Duration, max_poll: usize) -> Self {
Self {
poll,
interval,
next_poll: Instant::now(),
max_poll,
}
}

/// Get how often a port should get polled in a single poll
pub fn max_poll(&self) -> usize {
self.max_poll
}

/// Returns the interval for the current [`Port`]
pub fn interval(&self) -> &Duration {
&self.interval
Expand Down Expand Up @@ -69,7 +82,7 @@ mod test {
#[test]
fn test_single_listener() {
let mut listener =
Port::<MockEvent>::new(Box::new(MockPoll::default()), Duration::from_secs(5));
Port::<MockEvent>::new(Box::new(MockPoll::default()), Duration::from_secs(5), 1);
assert!(listener.next_poll() <= Instant::now());
assert_eq!(listener.should_poll(), true);
assert!(listener.poll().ok().unwrap().is_some());
Expand Down
78 changes: 51 additions & 27 deletions src/listener/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,35 +118,32 @@ where
/// Returns only the messages, while the None returned by poll are discarded
#[allow(clippy::needless_collect)]
fn poll(&mut self) -> Result<(), mpsc::SendError<ListenerMsg<U>>> {
let msg: Vec<ListenerMsg<U>> = self
.ports
.iter_mut()
.filter_map(|x| {
if x.should_poll() {
let msg = match x.poll() {
Ok(Some(ev)) => Some(ListenerMsg::User(ev)),
Ok(None) => None,
Err(err) => Some(ListenerMsg::Error(err)),
};
// Update next poll
x.calc_next_poll();
msg
} else {
None
let port_iter = self.ports.iter_mut().filter(|port| port.should_poll());

for port in port_iter {
let mut times_remaining = port.max_poll();
// poll a port until it has nothing anymore
loop {
let msg = match port.poll() {
Ok(Some(ev)) => ListenerMsg::User(ev),
Ok(None) => break,
Err(err) => ListenerMsg::Error(err),
};

self.sender.send(msg)?;

// do this at the end to at least call it once
times_remaining = times_remaining.saturating_sub(1);

if times_remaining == 0 {
break;
}
})
.collect();
// Send messages
match msg
.into_iter()
.map(|x| self.sender.send(x))
.filter(|x| x.is_err())
.map(|x| x.err().unwrap())
.next()
{
None => Ok(()),
Some(e) => Err(e),
}
// Update next poll
port.calc_next_poll();
}

Ok(())
}

/// thread run method
Expand Down Expand Up @@ -186,6 +183,29 @@ mod test {
use crate::mock::{MockEvent, MockPoll};
use crate::Event;

#[test]
fn worker_should_poll_multiple_times() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running_t = Arc::clone(&running);

let mock_port = Port::new(Box::new(MockPoll::default()), Duration::from_secs(5), 10);

let mut worker =
EventListenerWorker::<MockEvent>::new(vec![mock_port], tx, paused_t, running_t, None);
assert!(worker.poll().is_ok());
assert!(worker.next_event() <= Duration::from_secs(5));
let mut recieved = Vec::new();

while let Ok(msg) = rx.try_recv() {
recieved.push(msg);
}

assert_eq!(recieved.len(), 10);
}

#[test]
fn worker_should_send_poll() {
let (tx, rx) = mpsc::channel();
Expand All @@ -197,6 +217,7 @@ mod test {
vec![Port::new(
Box::new(MockPoll::default()),
Duration::from_secs(5),
1,
)],
tx,
paused_t,
Expand All @@ -223,6 +244,7 @@ mod test {
vec![Port::new(
Box::new(MockPoll::default()),
Duration::from_secs(5),
1,
)],
tx,
paused_t,
Expand All @@ -249,6 +271,7 @@ mod test {
vec![Port::new(
Box::new(MockPoll::default()),
Duration::from_secs(5),
1,
)],
tx,
paused_t,
Expand Down Expand Up @@ -293,6 +316,7 @@ mod test {
vec![Port::new(
Box::new(MockPoll::default()),
Duration::from_secs(3),
1,
)],
tx,
paused_t,
Expand Down

0 comments on commit 3c8e556

Please sign in to comment.