Skip to content

Commit

Permalink
update ParticipantStream. ProfilePhotoStream, SearchStream
Browse files Browse the repository at this point in the history
took the data out of photo_size_data in `DownloadStream` instead of streaming
return size_hint when available from DialogStream
updated doctests
updated DEPS.md
  • Loading branch information
YouKnow-sys committed Dec 20, 2024
1 parent 366e457 commit 19ce92a
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 102 deletions.
4 changes: 4 additions & 0 deletions lib/grammers-client/DEPS.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ Used to test that this file lists all dependencies from `Cargo.toml`.
Used for return custom types that `impl Future` so that the requests can be further configured
without having to use `Box`.

## futures

Provides Stream functionality

## futures-util

Provides useful functions for working with futures/tasks.
Expand Down
158 changes: 94 additions & 64 deletions lib/grammers-client/src/client/chats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,32 @@

//! Methods related to users, groups and channels.
use std::collections::VecDeque;
use std::future::Future;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;

use futures::Stream;

use grammers_mtsender::RpcError;
use grammers_session::{PackedChat, PackedType};
use grammers_tl_types as tl;

use super::Client;
use crate::types::{
chats::AdminRightsBuilderInner, chats::BannedRightsBuilderInner, AdminRightsBuilder,
BannedRightsBuilder, Chat, ChatMap, IterBuffer, Message, Participant, Photo, User,
};
use grammers_mtsender::RpcError;
pub use grammers_mtsender::{AuthorizationError, InvocationError};
use grammers_session::{PackedChat, PackedType};
use grammers_tl_types as tl;
use std::collections::VecDeque;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

const MAX_PARTICIPANT_LIMIT: usize = 200;
const MAX_PHOTO_LIMIT: usize = 100;
const KICK_BAN_DURATION: i32 = 60; // in seconds, in case the second request fails

pub enum ParticipantIter {
pub enum ParticipantStream {
Empty,
Chat {
client: Client,
Expand All @@ -37,7 +44,7 @@ pub enum ParticipantIter {
Channel(IterBuffer<tl::functions::channels::GetParticipants, Participant>),
}

impl ParticipantIter {
impl ParticipantStream {
fn new(client: &Client, chat: PackedChat) -> Self {
if let Some(channel) = chat.try_to_input_channel() {
Self::Channel(IterBuffer::from_request(
Expand Down Expand Up @@ -179,58 +186,70 @@ impl ParticipantIter {
}
}

/// Return the next `Participant` from the internal buffer, filling the buffer previously if
/// it's empty.
///
/// Returns `None` if the `limit` is reached or there are no participants left.
pub async fn next(&mut self) -> Result<Option<Participant>, InvocationError> {
// Need to split the `match` because `fill_buffer()` borrows mutably.
/// apply a filter on fetched participants, note that this filter will apply only on large `Channel` and not small groups
pub fn filter_participants(mut self, filter: tl::enums::ChannelParticipantsFilter) -> Self {
match self {
ParticipantStream::Channel(ref mut c) => {
c.request.filter = filter;
self
}
_ => self,
}
}
}

impl Stream for ParticipantStream {
type Item = Result<Participant, InvocationError>;

fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
// Need to split the `match` because `fill_buffer()` borrows mutably.
match self.deref_mut() {
Self::Empty => {}
Self::Chat { buffer, .. } => {
if buffer.is_empty() {
self.fill_buffer().await?;
let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
return Poll::Ready(Some(Err(e)));
}
}
}
Self::Channel(iter) => {
if let Some(result) = iter.next_raw() {
return result;
match result {
Ok(m) => return Poll::Ready(m.map(Ok)),
Err(e) => return Poll::Ready(Some(Err(e))),
}
}

let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
return Poll::Ready(Some(Err(e)));
}
self.fill_buffer().await?;
}
}

match self {
Self::Empty => Ok(None),
match self.deref_mut() {
Self::Empty => Poll::Ready(None),
Self::Chat { buffer, .. } => {
let result = buffer.pop_front();
if buffer.is_empty() {
*self = Self::Empty;
}
Ok(result)
Poll::Ready(result.map(Ok))
}
Self::Channel(iter) => Ok(iter.pop_item()),
}
}

/// apply a filter on fetched participants, note that this filter will apply only on large `Channel` and not small groups
pub fn filter(mut self, filter: tl::enums::ChannelParticipantsFilter) -> Self {
match self {
ParticipantIter::Channel(ref mut c) => {
c.request.filter = filter;
self
}
_ => self,
Self::Channel(iter) => Poll::Ready(iter.pop_item().map(Ok)),
}
}
}

pub enum ProfilePhotoIter {
pub enum ProfilePhotoStream {
User(IterBuffer<tl::functions::photos::GetUserPhotos, Photo>),
Chat(IterBuffer<tl::functions::messages::Search, Message>),
}

impl ProfilePhotoIter {
impl ProfilePhotoStream {
fn new(client: &Client, chat: PackedChat) -> Self {
if let Some(user_id) = chat.try_to_input_user() {
Self::User(IterBuffer::from_request(
Expand Down Expand Up @@ -300,37 +319,46 @@ impl ProfilePhotoIter {
Self::Chat(_) => panic!("fill_buffer should not be called for Chat variant"),
}
}
}

/// Return the next photo from the internal buffer, filling the buffer previously if it's
/// empty.
///
/// Returns `None` if the `limit` is reached or there are no photos left.
pub async fn next(&mut self) -> Result<Option<Photo>, InvocationError> {
impl Stream for ProfilePhotoStream {
type Item = Result<Photo, InvocationError>;

fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
// Need to split the `match` because `fill_buffer()` borrows mutably.
match self {
match self.deref_mut() {
Self::User(iter) => {
if let Some(result) = iter.next_raw() {
return result;
match result {
Ok(m) => return Poll::Ready(m.map(Ok)),
Err(e) => return Poll::Ready(Some(Err(e))),
}
}

let this = self.fill_buffer();
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
return Poll::Ready(Some(Err(e)));
}
self.fill_buffer().await?;
}
Self::Chat(iter) => {
while let Some(message) = iter.next().await? {
if let Some(tl::enums::MessageAction::ChatEditPhoto(
tl::types::MessageActionChatEditPhoto { photo },
)) = message.raw_action
{
return Ok(Some(Photo::from_raw(photo)));
} else {
continue;
Self::Chat(ref mut iter) => {
while let Some(maybe_message) = futures::ready!(Pin::new(&mut *iter).poll_next(cx)) {
match maybe_message {
Ok(message) => if let Some(tl::enums::MessageAction::ChatEditPhoto(
tl::types::MessageActionChatEditPhoto { photo },
)) = message.raw_action
{
return Poll::Ready(Some(Ok(Photo::from_raw(photo))));
},
Err(e) => return Poll::Ready(Some(Err(e))),
}
}
}
}

match self {
Self::User(iter) => Ok(iter.pop_item()),
Self::Chat(_) => Ok(None),
match self.get_mut() {
Self::User(iter) => Poll::Ready(iter.pop_item().map(Ok)),
Self::Chat(_) => Poll::Ready(None),
}
}
}
Expand Down Expand Up @@ -430,7 +458,7 @@ impl Client {
Ok(User::from_raw(res.pop().unwrap()))
}

/// Iterate over the participants of a chat.
/// Get a stream over participants of a chat.
///
/// The participants are returned in no particular order.
///
Expand All @@ -439,10 +467,11 @@ impl Client {
/// # Examples
///
/// ```
/// # use futures::TryStreamExt;
/// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
/// let mut participants = client.iter_participants(&chat);
/// let mut participants = client.stream_participants(&chat);
///
/// while let Some(participant) = participants.next().await? {
/// while let Some(participant) = participants.try_next().await? {
/// println!(
/// "{} has role {:?}",
/// participant.user.first_name().unwrap_or(&participant.user.id().to_string()),
Expand All @@ -452,8 +481,8 @@ impl Client {
/// # Ok(())
/// # }
/// ```
pub fn iter_participants<C: Into<PackedChat>>(&self, chat: C) -> ParticipantIter {
ParticipantIter::new(self, chat.into())
pub fn stream_participants<C: Into<PackedChat>>(&self, chat: C) -> ParticipantStream {
ParticipantStream::new(self, chat.into())
}

/// Kicks the participant from the chat.
Expand Down Expand Up @@ -603,7 +632,7 @@ impl Client {
)
}

/// Iterate over the history of profile photos for the given user or chat.
/// Get stream over the history of profile photos for the given user or chat.
///
/// Note that the current photo might not be present in the history, and to avoid doing more
/// work when it's generally not needed (the photo history tends to be complete but in some
Expand All @@ -615,17 +644,18 @@ impl Client {
/// # Examples
///
/// ```
/// # use futures::TryStreamExt;
/// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
/// let mut photos = client.iter_profile_photos(&chat);
///
/// while let Some(photo) = photos.next().await? {
/// while let Some(photo) = photos.try_next().await? {
/// println!("Did you know chat has a photo with ID {}?", photo.id());
/// }
/// # Ok(())
/// # }
/// ```
pub fn iter_profile_photos<C: Into<PackedChat>>(&self, chat: C) -> ProfilePhotoIter {
ProfilePhotoIter::new(self, chat.into())
pub fn stream_profile_photos<C: Into<PackedChat>>(&self, chat: C) -> ProfilePhotoStream {
ProfilePhotoStream::new(self, chat.into())
}

/// Convert a [`PackedChat`] back into a [`Chat`].
Expand Down
22 changes: 16 additions & 6 deletions lib/grammers-client/src/client/dialogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ impl Stream for DialogStream {
) -> Poll<Option<Self::Item>> {
if let Some(result) = self.next_raw() {
match result {
Ok(Some(d)) => return Poll::Ready(Some(Ok(d))),
Ok(m) => return Poll::Ready(m.map(Ok)),
Err(e) => return Poll::Ready(Some(Err(e))),
_ => (),
}
}

Expand Down Expand Up @@ -159,23 +158,34 @@ impl Stream for DialogStream {

Poll::Ready(self.pop_item().map(Ok))
}

fn size_hint(&self) -> (usize, Option<usize>) {
match self.total {
Some(total) => {
let rem = total - self.fetched;
(rem, Some(rem))
},
None => (0, None),
}
}
}

/// Method implementations related to open conversations.
impl Client {
/// Returns a new iterator over the dialogs.
/// Returns a new stream over the dialogs.
///
/// While iterating, the update state for any broadcast channel or megagroup will be set if it was unknown before.
/// While streaming, the update state for any broadcast channel or megagroup will be set if it was unknown before.
/// When the update state is set for these chats, the library can actively check to make sure it's not missing any
/// updates from them (as long as the queue limit for updates is larger than zero).
///
/// # Examples
///
/// ```
/// # use futures::TryStreamExt;
/// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
/// let mut dialogs = client.iter_dialogs();
/// let mut dialogs = client.stream_dialogs();
///
/// while let Some(dialog) = dialogs.next().await? {
/// while let Some(dialog) = dialogs.try_next().await? {
/// let chat = dialog.chat();
/// println!("{} ({})", chat.name().unwrap_or_default(), chat.id());
/// }
Expand Down
12 changes: 6 additions & 6 deletions lib/grammers-client/src/client/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ impl Stream for DownloadStream {
return Poll::Ready(None);
}

if let Some(data) = &self.photo_size_data {
let data = data.clone();
if let Some(data) = self.photo_size_data.take() {
self.done = true;
return Poll::Ready(Some(Ok(data)));
}
Expand Down Expand Up @@ -178,16 +177,17 @@ impl Stream for DownloadStream {

/// Method implementations related to uploading or downloading files.
impl Client {
/// Returns a new iterator over the contents of a media document that will be downloaded.
/// Returns a new stream over the contents of a media document that will be downloaded.
///
/// # Examples
///
/// ```
/// # use futures::TryStreamExt;
/// # async fn f(downloadable: grammers_client::types::Downloadable, client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
/// let mut file_bytes = Vec::new();
/// let mut download = client.iter_download(&downloadable);
/// let mut download = client.stream_download(&downloadable);
///
/// while let Some(chunk) = download.next().await? {
/// while let Some(chunk) = download.try_next().await? {
/// file_bytes.extend(chunk);
/// }
///
Expand All @@ -203,7 +203,7 @@ impl Client {
///
/// If the file already exists, it will be overwritten.
///
/// This is a small wrapper around [`Client::iter_download`] for the common case of
/// This is a small wrapper around [`Client::stream_download`] for the common case of
/// wanting to save the file locally.
///
/// # Examples
Expand Down
Loading

0 comments on commit 19ce92a

Please sign in to comment.