Skip to content

Commit

Permalink
Revert "Partially revert "Cleanup stream handling and improve update …
Browse files Browse the repository at this point in the history
…streams (Lonami#297)""

This reverts commit 7d230d0.
  • Loading branch information
YouKnow-sys committed Dec 22, 2024
1 parent f44f673 commit 1e82f67
Showing 1 changed file with 90 additions and 44 deletions.
134 changes: 90 additions & 44 deletions lib/grammers-client/src/client/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,83 @@

//! Methods to deal with and offer access to updates.
use super::Client;
use crate::types::{ChatMap, Update};
use futures::stream::FusedStream;
use futures::Stream;
use futures_util::future::{select, Either};
pub use grammers_mtsender::{AuthorizationError, InvocationError};
use grammers_session::channel_id;
pub use grammers_session::{PrematureEndReason, UpdateState};
use grammers_tl_types as tl;
use std::future::Future;
use std::pin::pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::{Duration, Instant};

use futures::{
future::{select, Either},
stream::FusedStream,
Stream,
};
use tokio::time::sleep_until;

pub use grammers_mtsender::{AuthorizationError, InvocationError};
use grammers_session::channel_id;
pub use grammers_session::{PrematureEndReason, UpdateState};
use grammers_tl_types as tl;

use super::Client;
use crate::types::{ChatMap, Update};
use crate::utils::{poll_future, poll_future_ready};

/// How long to wait after warning the user that the updates limit was exceeded.
const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);

impl Client {
/// Returns the next update from the buffer where they are queued until used.
/// Returns a stream over raw updates.
///
/// # Example
///
/// ```
/// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
/// use futures::TryStreamExt;
/// use grammers_client::Update;
///
/// client
/// .raw_update_stream()
/// .try_for_each(|(update, _)| {
/// // Print all incoming updates in their raw form
/// dbg!(update);
/// futures::future::ready(Ok(()))
/// })
/// .await;
/// # Ok(())
/// # }
/// ```
pub fn raw_update_stream(&self) -> RawUpdateStream<'_> {
RawUpdateStream { client: self }
}

/// Returns a stream over updates.
///
/// # Example
///
/// ```
/// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
/// use futures::TryStreamExt;
/// use grammers_client::Update;
///
/// loop {
/// let update = client.next_update().await?;
/// // Echo incoming messages and ignore everything else
/// match update {
/// Update::NewMessage(mut message) if !message.outgoing() => {
/// message.respond(message.text()).await?;
/// client
/// .update_stream()
/// .try_for_each_concurrent(None, |update| async {
/// match update {
/// Update::NewMessage(message) if !message.outgoing() => {
/// message.respond(message.text()).await.map(|_| ())
/// }
/// _ => Ok(()),
/// }
/// _ => {}
/// }
/// }
/// })
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn update_stream(&self) -> UpdateStream<'_> {
UpdateStream { client: self }
UpdateStream {
raw_stream: self.raw_update_stream(),
}
}

pub(crate) fn process_socket_updates(&self, all_updates: Vec<tl::enums::Updates>) {
Expand Down Expand Up @@ -132,33 +166,16 @@ impl Client {
}
}

pub struct UpdateStream<'a> {
pub struct RawUpdateStream<'a> {
client: &'a Client,
}

impl<'a> UpdateStream<'a> {
impl<'a> RawUpdateStream<'a> {
/// Returns the next raw update and associated chat map from the buffer where they are queued until used.
///
/// # Example
///
/// ```
/// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
/// loop {
/// let (update, chats) = client.next_raw_update().await?;
///
/// // Print all incoming updates in their raw form
/// dbg!(update);
/// }
/// # Ok(())
/// # }
///
/// ```
///
/// P.S. If you don't receive updateBotInlineSend, go to [@BotFather](https://t.me/BotFather), select your bot and click "Bot Settings", then "Inline Feedback" and select probability.
///
pub async fn next_raw_update(
&self,
) -> Result<(tl::enums::Update, Arc<ChatMap>), InvocationError> {
async fn next_raw_update(&self) -> Result<(tl::enums::Update, Arc<ChatMap>), InvocationError> {
loop {
let (deadline, get_diff, get_channel_diff) = {
let state = &mut *self.client.0.state.write().unwrap();
Expand Down Expand Up @@ -279,6 +296,37 @@ impl<'a> UpdateStream<'a> {
}
}

impl<'a> Stream for RawUpdateStream<'a> {
type Item = Result<(tl::enums::Update, Arc<ChatMap>), InvocationError>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
poll_future!(cx, self.next_raw_update()).map(Some)
}
}

impl<'a> FusedStream for RawUpdateStream<'a> {
fn is_terminated(&self) -> bool {
// The update stream is a continuous flow of updates.
// As a long-running stream, it never reaches a
// terminated state, hence we always return false.
false
}
}

pub struct UpdateStream<'a> {
raw_stream: RawUpdateStream<'a>,
}

impl<'a> UpdateStream<'a> {
/// Consume the [`UpdateStream`] and return the underlying [`RawUpdateStream`].
pub fn into_raw_update_stream(self) -> RawUpdateStream<'a> {
self.raw_stream
}
}

impl<'a> Stream for UpdateStream<'a> {
type Item = Result<Update, InvocationError>;

Expand All @@ -288,15 +336,13 @@ impl<'a> Stream for UpdateStream<'a> {
) -> Poll<Option<Self::Item>> {
loop {
let (update, chats) = {
let this = self.next_raw_update();
futures::pin_mut!(this);
match futures::ready!(this.poll(cx)) {
match poll_future_ready!(cx, self.raw_stream.next_raw_update()) {
Ok(update) => update,
Err(e) => return Poll::Ready(Some(Err(e))),
}
};

if let Some(update) = Update::new(self.client, update, &chats) {
if let Some(update) = Update::new(self.raw_stream.client, update, &chats) {
return Poll::Ready(Some(Ok(update)));
}
}
Expand Down

0 comments on commit 1e82f67

Please sign in to comment.