Skip to content

Commit

Permalink
implement Stream for Responses
Browse files Browse the repository at this point in the history
Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg committed Aug 16, 2024
1 parent fbc9944 commit e7ecfce
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
use bytes::{Buf, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
use futures_util::{future, pin_mut, ready, Stream, StreamExt, TryStreamExt};
use parking_lot::Mutex;
use postgres_protocol::message::backend::Message;
use postgres_types::BorrowToSql;
Expand All @@ -29,6 +29,7 @@ use std::fmt;
use std::net::IpAddr;
#[cfg(feature = "runtime")]
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -61,6 +62,17 @@ impl Responses {
}
}

impl Stream for Responses {
type Item = Result<Message, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!((*self).poll_next(cx)) {
Err(err) if err.is_closed() => Poll::Ready(None),
msg => Poll::Ready(Some(msg)),
}
}
}

/// A cache of type info and prepared statements for fetching type info
/// (corresponding to the queries in the [prepare](prepare) module).
#[derive(Default)]
Expand Down

0 comments on commit e7ecfce

Please sign in to comment.