Skip to content

Commit

Permalink
Added broadcast channel
Browse files Browse the repository at this point in the history
Also flushed out the APIs in general.
  • Loading branch information
ecton committed Jan 20, 2025
1 parent 5385360 commit bd98d01
Show file tree
Hide file tree
Showing 3 changed files with 1,703 additions and 375 deletions.
28 changes: 19 additions & 9 deletions src/reactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ struct Futures {
}

impl Futures {
fn spawn(&mut self, future: PollChannelFuture) -> usize {
let id = self.push(future);
self.queue.push_back(id);
id
}

fn push(&mut self, future: PollChannelFuture) -> usize {
let mut id = None;
while !self.available.is_empty() {
Expand Down Expand Up @@ -228,7 +234,8 @@ impl Futures {
let mut ctx = Context::from_waker(&registered.waker);
match Pin::new(future).poll(&mut ctx) {
Poll::Ready(()) => {
self.registered.remove(id);
registered.future = None;
self.available.insert(id);
callbacks_executed += 1;
}
Poll::Pending => {}
Expand Down Expand Up @@ -305,7 +312,9 @@ impl CallbackExecutor {
self.channels.notify(id, &mut self.futures);
}
ChannelTask::Unregister(id) => {
self.channels.unregister(id);
if let Some(future_id) = self.channels.unregister(id) {
self.futures.wake(future_id);
}
}
},
BackgroundTask::ExecuteCallbacks(callbacks) => {
Expand Down Expand Up @@ -334,7 +343,7 @@ impl WatchedChannels {
return;
};
let future_id = channel.should_poll().then(|| {
futures.push(PollChannelFuture {
futures.spawn(PollChannelFuture {
channel: channel.clone(),
futures: Vec::new(),
})
Expand Down Expand Up @@ -364,11 +373,11 @@ impl WatchedChannels {
.push_back(channel.future_id.expect("initialized above"));
}

fn unregister(&mut self, id: usize) {
let Some(id) = self.by_id.remove(&id) else {
return;
};
self.registry.remove(id);
fn unregister(&mut self, id: usize) -> Option<usize> {
let id = self.by_id.remove(&id)?;
self.registry
.remove(id)
.and_then(|removed| removed.future_id)
}
}

Expand All @@ -388,6 +397,7 @@ impl Future for PollChannelFuture {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
if this.futures.is_empty() && !this.channel.poll(&mut this.futures) {
this.channel.disconnect();
return Poll::Ready(());
}
loop {
Expand All @@ -399,7 +409,7 @@ impl Future for PollChannelFuture {
match result {
Ok(()) => {}
Err(CallbackDisconnected) => {
self.channel.disconnect_callback();
self.channel.disconnect();
}
}
completed_one = true;
Expand Down
Loading

0 comments on commit bd98d01

Please sign in to comment.