Skip to content

Commit

Permalink
Cleanup stream handling and improve update streams (#297)
Browse files Browse the repository at this point in the history
  • Loading branch information
YouKnow-sys authored Dec 20, 2024
1 parent b942670 commit bb46ef0
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 99 deletions.
3 changes: 0 additions & 3 deletions lib/grammers-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ serde = ["grammers-tl-types/impl-serde"]
[dependencies]
chrono = "0.4.38"
futures = "0.3.31"
futures-util = { version = "0.3.30", default-features = false, features = [
"alloc"
] }
grammers-crypto = { path = "../grammers-crypto", version = "0.7.0" }
grammers-mtproto = { path = "../grammers-mtproto", version = "0.7.0" }
grammers-mtsender = { path = "../grammers-mtsender", version = "0.7.0" }
Expand Down
4 changes: 0 additions & 4 deletions lib/grammers-client/DEPS.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ without having to use `Box`.

Provides Stream functionality

## futures-util

Provides useful functions for working with futures/tasks.

## url

Used to parse certain URLs to offer features such as joining private chats via their invite link.
14 changes: 9 additions & 5 deletions lib/grammers-client/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@
//! cargo run --example echo -- BOT_TOKEN
//! ```
use futures::StreamExt;
use futures_util::future::{select, Either};
use grammers_client::session::Session;
use grammers_client::{Client, Config, InitParams, Update};
use simple_logger::SimpleLogger;
use std::env;
use std::pin::pin;

use futures::{
future::{select, Either},
StreamExt,
};
use simple_logger::SimpleLogger;
use tokio::{runtime, task};

use grammers_client::session::Session;
use grammers_client::{Client, Config, InitParams, Update};

type Result = std::result::Result<(), Box<dyn std::error::Error>>;

const SESSION_FILE: &str = "echo.session";
Expand Down
14 changes: 9 additions & 5 deletions lib/grammers-client/examples/inline-pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
//! how much data a button's payload can contain, and to keep it simple, we're storing it inline
//! in decimal, so the numbers can't get too large).
use futures::StreamExt;
use futures_util::future::{select, Either};
use grammers_client::session::Session;
use grammers_client::{button, reply_markup, Client, Config, InputMessage, Update};
use simple_logger::SimpleLogger;
use std::env;
use std::pin::pin;

use futures::{
future::{select, Either},
StreamExt,
};
use simple_logger::SimpleLogger;
use tokio::{runtime, task};

use grammers_client::session::Session;
use grammers_client::{button, reply_markup, Client, Config, InputMessage, Update};

type Result = std::result::Result<(), Box<dyn std::error::Error>>;

const SESSION_FILE: &str = "inline-pagination.session";
Expand Down
22 changes: 10 additions & 12 deletions lib/grammers-client/src/client/chats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ use grammers_session::{PackedChat, PackedType};
use grammers_tl_types as tl;

use super::Client;
use crate::types::{
chats::AdminRightsBuilderInner, chats::BannedRightsBuilderInner, AdminRightsBuilder,
BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant, Photo, User,
use crate::{
types::{
chats::{AdminRightsBuilderInner, BannedRightsBuilderInner},
AdminRightsBuilder, BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant,
Photo, User,
},
utils::poll_future_ready,
};
pub use grammers_mtsender::{AuthorizationError, InvocationError};

Expand Down Expand Up @@ -210,9 +214,7 @@ impl Stream for ParticipantStream {
Self::Empty => {}
Self::Chat { buffer, .. } => {
if buffer.is_empty() {
let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) {
return Poll::Ready(Some(Err(e)));
}
}
Expand All @@ -225,9 +227,7 @@ impl Stream for ParticipantStream {
}
}

let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down Expand Up @@ -341,9 +341,7 @@ impl Stream for ProfilePhotoStream {
}
}

let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer()) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down
5 changes: 2 additions & 3 deletions lib/grammers-client/src/client/dialogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use grammers_session::PackedChat;
use grammers_tl_types as tl;

use crate::types::{ChatMap, Dialog, IterBuffer, Message};
use crate::utils::poll_future_ready;
use crate::Client;

const MAX_LIMIT: usize = 100;
Expand Down Expand Up @@ -79,9 +80,7 @@ impl Stream for DialogStream {

let result = {
self.request.limit = self.determine_limit(MAX_LIMIT);
let this = self.client.invoke(&self.request);
futures::pin_mut!(this);
futures::ready!(this.poll(cx))
poll_future_ready!(cx, self.client.invoke(&self.request))
}?;

let (dialogs, messages, users, chats) = match result {
Expand Down
18 changes: 7 additions & 11 deletions lib/grammers-client/src/client/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use std::future::Future;
use std::task::Poll;
use std::{io::SeekFrom, path::Path, sync::Arc};

use futures::{Stream, TryStreamExt};
use futures_util::stream::{FuturesUnordered, StreamExt};
use futures::{
stream::{FuturesUnordered, StreamExt},
Stream, TryStreamExt,
};
use tokio::sync::mpsc::unbounded_channel;
use tokio::{
fs,
Expand All @@ -22,7 +24,7 @@ use grammers_mtsender::InvocationError;
use grammers_tl_types as tl;

use crate::types::{photo_sizes::PhotoSize, Downloadable, Media, Uploaded};
use crate::utils::generate_random_id;
use crate::utils::{generate_random_id, poll_future_ready};
use crate::Client;

pub const MIN_CHUNK_SIZE: i32 = 4 * 1024;
Expand Down Expand Up @@ -139,15 +141,9 @@ impl Stream for DownloadStream {
loop {
let result = match self.dc.take() {
Some(dc) => {
let this = self.client.invoke_in_dc(&self.request, dc as i32);
futures::pin_mut!(this);
futures::ready!(this.poll(cx))
}
None => {
let this = self.client.invoke(&self.request);
futures::pin_mut!(this);
futures::ready!(this.poll(cx))
poll_future_ready!(cx, self.client.invoke_in_dc(&self.request, dc as i32))
}
None => poll_future_ready!(cx, self.client.invoke(&self.request)),
};

break match result {
Expand Down
14 changes: 4 additions & 10 deletions lib/grammers-client/src/client/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! Methods related to sending messages.
use crate::types::message::EMPTY_MESSAGE;
use crate::types::{InputReactions, IterBuffer, Message};
use crate::utils::{generate_random_id, generate_random_ids};
use crate::utils::{generate_random_id, generate_random_ids, poll_future_ready};
use crate::{types, ChatMap, Client, InputMedia};
use chrono::{DateTime, FixedOffset};
use futures::Stream;
Expand Down Expand Up @@ -249,9 +249,7 @@ impl Stream for MessageStream {
{
self.request.limit = self.determine_limit(MAX_LIMIT);
let limit = self.request.limit;
let this = self.fill_buffer(limit);
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer(limit)) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down Expand Up @@ -390,9 +388,7 @@ impl Stream for SearchStream {
{
self.request.limit = self.determine_limit(MAX_LIMIT);
let limit = self.request.limit;
let this = self.fill_buffer(limit);
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
if let Err(e) = poll_future_ready!(cx, self.fill_buffer(limit)) {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down Expand Up @@ -479,9 +475,7 @@ impl Stream for GlobalSearchStream {
let offset_rate = {
self.request.limit = self.determine_limit(MAX_LIMIT);
let limit = self.request.limit;
let this = self.fill_buffer(limit);
futures::pin_mut!(this);
match futures::ready!(this.poll(cx)) {
match poll_future_ready!(cx, self.fill_buffer(limit)) {
Ok(offset_rate) => offset_rate,
Err(e) => return Poll::Ready(Some(Err(e))),
}
Expand Down
Loading

0 comments on commit bb46ef0

Please sign in to comment.