diff --git a/postgres/src/connection.rs b/postgres/src/connection.rs index b91c1655..2cd8155d 100644 --- a/postgres/src/connection.rs +++ b/postgres/src/connection.rs @@ -1,9 +1,9 @@ use crate::{Error, Notification}; -use futures_util::{future, pin_mut, Stream}; +use futures_util::Stream; use std::collections::VecDeque; -use std::future::Future; +use std::future::{self, Future}; use std::ops::{Deref, DerefMut}; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -52,7 +52,7 @@ impl Connection { where F: Future>, { - pin_mut!(future); + let mut future = pin!(future); self.poll_block_on(|cx, _, _| future.as_mut().poll(cx)) } diff --git a/postgres/src/notifications.rs b/postgres/src/notifications.rs index 0c040ded..33fd3ae4 100644 --- a/postgres/src/notifications.rs +++ b/postgres/src/notifications.rs @@ -3,9 +3,9 @@ use crate::connection::ConnectionRef; use crate::{Error, Notification}; use fallible_iterator::FallibleIterator; -use futures_util::{ready, FutureExt}; +use futures_util::FutureExt; use std::pin::Pin; -use std::task::Poll; +use std::task::{ready, Poll}; use std::time::Duration; use tokio::time::{self, Instant, Sleep}; diff --git a/tokio-postgres/src/binary_copy.rs b/tokio-postgres/src/binary_copy.rs index dab14166..9928ef5c 100644 --- a/tokio-postgres/src/binary_copy.rs +++ b/tokio-postgres/src/binary_copy.rs @@ -4,7 +4,7 @@ use crate::types::{FromSql, IsNull, ToSql, Type, WrongType}; use crate::{slice_iter, CopyInSink, CopyOutStream, Error}; use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures_util::{ready, SinkExt, Stream}; +use futures_util::{SinkExt, Stream}; use pin_project_lite::pin_project; use postgres_types::BorrowToSql; use std::convert::TryFrom; @@ -13,7 +13,7 @@ use std::io::Cursor; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; const MAGIC: &[u8] = b"PGCOPY\n\xff\r\n\0"; const HEADER_LEN: usize = MAGIC.len() + 4 + 4; diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs index b38bbba3..4d276125 100644 --- a/tokio-postgres/src/client.rs +++ b/tokio-postgres/src/client.rs @@ -19,16 +19,18 @@ use crate::{ use bytes::{Buf, BytesMut}; use fallible_iterator::FallibleIterator; use futures_channel::mpsc; -use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt}; +use futures_util::{ready, StreamExt, TryStreamExt}; use parking_lot::Mutex; use postgres_protocol::message::backend::Message; use postgres_types::BorrowToSql; use std::collections::HashMap; use std::fmt; +use std::future; #[cfg(feature = "runtime")] use std::net::IpAddr; #[cfg(feature = "runtime")] use std::path::PathBuf; +use std::pin::pin; use std::sync::Arc; use std::task::{Context, Poll}; #[cfg(feature = "runtime")] @@ -300,8 +302,7 @@ impl Client { where T: ?Sized + ToStatement, { - let stream = self.query_raw(statement, slice_iter(params)).await?; - pin_mut!(stream); + let mut stream = pin!(self.query_raw(statement, slice_iter(params)).await?); let mut first = None; @@ -336,18 +337,18 @@ impl Client { /// /// ```no_run /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> { - /// use futures_util::{pin_mut, TryStreamExt}; + /// use std::pin::pin; + /// use futures_util::TryStreamExt; /// /// let params: Vec = vec![ /// "first param".into(), /// "second param".into(), /// ]; - /// let mut it = client.query_raw( + /// let mut it = pin!(client.query_raw( /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2", /// params, - /// ).await?; + /// ).await?); /// - /// pin_mut!(it); /// while let Some(row) = it.try_next().await? { /// let foo: i32 = row.get("foo"); /// println!("foo: {}", foo); @@ -402,19 +403,19 @@ impl Client { /// /// ```no_run /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> { - /// use futures_util::{pin_mut, TryStreamExt}; + /// use std::pin::pin; + /// use futures_util::{TryStreamExt}; /// use tokio_postgres::types::Type; /// /// let params: Vec<(String, Type)> = vec![ /// ("first param".into(), Type::TEXT), /// ("second param".into(), Type::TEXT), /// ]; - /// let mut it = client.query_typed_raw( + /// let mut it = pin!(client.query_typed_raw( /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2", /// params, - /// ).await?; + /// ).await?); /// - /// pin_mut!(it); /// while let Some(row) = it.try_next().await? { /// let foo: i32 = row.get("foo"); /// println!("foo: {}", foo); diff --git a/tokio-postgres/src/connect.rs b/tokio-postgres/src/connect.rs index e97a7a2a..67c6e4ba 100644 --- a/tokio-postgres/src/connect.rs +++ b/tokio-postgres/src/connect.rs @@ -4,8 +4,10 @@ use crate::connect_raw::connect_raw; use crate::connect_socket::connect_socket; use crate::tls::MakeTlsConnect; use crate::{Client, Config, Connection, Error, SimpleQueryMessage, Socket}; -use futures_util::{future, pin_mut, Future, FutureExt, Stream}; +use futures_util::{FutureExt, Stream}; use rand::seq::SliceRandom; +use std::future::{self, Future}; +use std::pin::pin; use std::task::Poll; use std::{cmp, io}; use tokio::net; @@ -161,18 +163,18 @@ where let (mut client, mut connection) = connect_raw(socket, tls, has_hostname, config).await?; if config.target_session_attrs != TargetSessionAttrs::Any { - let rows = client.simple_query_raw("SHOW transaction_read_only"); - pin_mut!(rows); + let mut rows = pin!(client.simple_query_raw("SHOW transaction_read_only")); - let rows = future::poll_fn(|cx| { - if connection.poll_unpin(cx)?.is_ready() { - return Poll::Ready(Err(Error::closed())); - } + let mut rows = pin!( + future::poll_fn(|cx| { + if connection.poll_unpin(cx)?.is_ready() { + return Poll::Ready(Err(Error::closed())); + } - rows.as_mut().poll(cx) - }) - .await?; - pin_mut!(rows); + rows.as_mut().poll(cx) + }) + .await? + ); loop { let next = future::poll_fn(|cx| { diff --git a/tokio-postgres/src/connect_raw.rs b/tokio-postgres/src/connect_raw.rs index cf7476ca..5f27fed7 100644 --- a/tokio-postgres/src/connect_raw.rs +++ b/tokio-postgres/src/connect_raw.rs @@ -7,7 +7,7 @@ use crate::{Client, Connection, Error}; use bytes::BytesMut; use fallible_iterator::FallibleIterator; use futures_channel::mpsc; -use futures_util::{ready, Sink, SinkExt, Stream, TryStreamExt}; +use futures_util::{Sink, SinkExt, Stream, TryStreamExt}; use postgres_protocol::authentication; use postgres_protocol::authentication::sasl; use postgres_protocol::authentication::sasl::ScramSha256; @@ -17,7 +17,7 @@ use std::borrow::Cow; use std::collections::{HashMap, VecDeque}; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; diff --git a/tokio-postgres/src/connection.rs b/tokio-postgres/src/connection.rs index 41433595..f5e63a81 100644 --- a/tokio-postgres/src/connection.rs +++ b/tokio-postgres/src/connection.rs @@ -6,14 +6,14 @@ use crate::{AsyncMessage, Error, Notification}; use bytes::BytesMut; use fallible_iterator::FallibleIterator; use futures_channel::mpsc; -use futures_util::{ready, stream::FusedStream, Sink, Stream, StreamExt}; +use futures_util::{stream::FusedStream, Sink, Stream, StreamExt}; use log::{info, trace}; use postgres_protocol::message::backend::Message; use postgres_protocol::message::frontend; use std::collections::{HashMap, VecDeque}; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; diff --git a/tokio-postgres/src/copy_in.rs b/tokio-postgres/src/copy_in.rs index 59e31fea..74081754 100644 --- a/tokio-postgres/src/copy_in.rs +++ b/tokio-postgres/src/copy_in.rs @@ -5,15 +5,16 @@ use crate::query::extract_row_affected; use crate::{query, slice_iter, Error, Statement}; use bytes::{Buf, BufMut, BytesMut}; use futures_channel::mpsc; -use futures_util::{future, ready, Sink, SinkExt, Stream, StreamExt}; +use futures_util::{Sink, SinkExt, Stream, StreamExt}; use log::debug; use pin_project_lite::pin_project; use postgres_protocol::message::backend::Message; use postgres_protocol::message::frontend; use postgres_protocol::message::frontend::CopyData; +use std::future; use std::marker::{PhantomData, PhantomPinned}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; enum CopyInMessage { Message(FrontendMessage), diff --git a/tokio-postgres/src/copy_out.rs b/tokio-postgres/src/copy_out.rs index 1e694925..6bb435aa 100644 --- a/tokio-postgres/src/copy_out.rs +++ b/tokio-postgres/src/copy_out.rs @@ -3,13 +3,13 @@ use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::{query, slice_iter, Error, Statement}; use bytes::Bytes; -use futures_util::{ready, Stream}; +use futures_util::Stream; use log::debug; use pin_project_lite::pin_project; use postgres_protocol::message::backend::Message; use std::marker::PhantomPinned; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result { debug!("executing copy out statement {}", statement.name()); diff --git a/tokio-postgres/src/prepare.rs b/tokio-postgres/src/prepare.rs index 1d9bacb1..72fe3372 100644 --- a/tokio-postgres/src/prepare.rs +++ b/tokio-postgres/src/prepare.rs @@ -7,12 +7,12 @@ use crate::{query, slice_iter}; use crate::{Column, Error, Statement}; use bytes::Bytes; use fallible_iterator::FallibleIterator; -use futures_util::{pin_mut, TryStreamExt}; +use futures_util::TryStreamExt; use log::debug; use postgres_protocol::message::backend::Message; use postgres_protocol::message::frontend; use std::future::Future; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -142,8 +142,7 @@ pub(crate) async fn get_type(client: &Arc, oid: Oid) -> Result row, diff --git a/tokio-postgres/src/query.rs b/tokio-postgres/src/query.rs index 2fcb22d5..0e107c99 100644 --- a/tokio-postgres/src/query.rs +++ b/tokio-postgres/src/query.rs @@ -6,7 +6,7 @@ use crate::types::{BorrowToSql, IsNull}; use crate::{Column, Error, Portal, Row, Statement}; use bytes::{Bytes, BytesMut}; use fallible_iterator::FallibleIterator; -use futures_util::{ready, Stream}; +use futures_util::Stream; use log::{debug, log_enabled, Level}; use pin_project_lite::pin_project; use postgres_protocol::message::backend::{CommandCompleteBody, Message}; @@ -16,7 +16,7 @@ use std::fmt; use std::marker::PhantomPinned; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; struct BorrowToSqlParamsDebug<'a, T>(&'a [T]); diff --git a/tokio-postgres/src/simple_query.rs b/tokio-postgres/src/simple_query.rs index 24473b89..cb9d2eb2 100644 --- a/tokio-postgres/src/simple_query.rs +++ b/tokio-postgres/src/simple_query.rs @@ -5,7 +5,7 @@ use crate::query::extract_row_affected; use crate::{Error, SimpleQueryMessage, SimpleQueryRow}; use bytes::Bytes; use fallible_iterator::FallibleIterator; -use futures_util::{ready, Stream}; +use futures_util::Stream; use log::debug; use pin_project_lite::pin_project; use postgres_protocol::message::backend::Message; @@ -13,7 +13,7 @@ use postgres_protocol::message::frontend; use std::marker::PhantomPinned; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; /// Information about a column of a single query row. #[derive(Debug)] diff --git a/tokio-postgres/tests/test/binary_copy.rs b/tokio-postgres/tests/test/binary_copy.rs index 94b96ab8..12b08f67 100644 --- a/tokio-postgres/tests/test/binary_copy.rs +++ b/tokio-postgres/tests/test/binary_copy.rs @@ -1,5 +1,6 @@ use crate::connect; -use futures_util::{pin_mut, TryStreamExt}; +use futures_util::TryStreamExt; +use std::pin::pin; use tokio_postgres::binary_copy::{BinaryCopyInWriter, BinaryCopyOutStream}; use tokio_postgres::types::Type; @@ -16,8 +17,7 @@ async fn write_basic() { .copy_in("COPY foo (id, bar) FROM STDIN BINARY") .await .unwrap(); - let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]); - pin_mut!(writer); + let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT])); writer.as_mut().write(&[&1i32, &"foobar"]).await.unwrap(); writer .as_mut() @@ -50,8 +50,7 @@ async fn write_many_rows() { .copy_in("COPY foo (id, bar) FROM STDIN BINARY") .await .unwrap(); - let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT]); - pin_mut!(writer); + let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT])); for i in 0..10_000i32 { writer @@ -86,8 +85,7 @@ async fn write_big_rows() { .copy_in("COPY foo (id, bar) FROM STDIN BINARY") .await .unwrap(); - let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA]); - pin_mut!(writer); + let mut writer = pin!(BinaryCopyInWriter::new(sink, &[Type::INT4, Type::BYTEA])); for i in 0..2i32 { writer diff --git a/tokio-postgres/tests/test/main.rs b/tokio-postgres/tests/test/main.rs index 9a6aa26f..d7add9fd 100644 --- a/tokio-postgres/tests/test/main.rs +++ b/tokio-postgres/tests/test/main.rs @@ -2,12 +2,11 @@ use bytes::{Bytes, BytesMut}; use futures_channel::mpsc; -use futures_util::{ - future, join, pin_mut, stream, try_join, Future, FutureExt, SinkExt, StreamExt, TryStreamExt, -}; +use futures_util::{join, stream, try_join, FutureExt, SinkExt, StreamExt, TryStreamExt}; use pin_project_lite::pin_project; use std::fmt::Write; -use std::pin::Pin; +use std::future::{self, Future}; +use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use std::time::Duration; use tokio::net::TcpStream; @@ -589,8 +588,7 @@ async fn copy_in() { .into_iter() .map(Ok::<_, Error>), ); - let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap(); - pin_mut!(sink); + let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap()); sink.send_all(&mut stream).await.unwrap(); let rows = sink.finish().await.unwrap(); assert_eq!(rows, 2); @@ -636,8 +634,7 @@ async fn copy_in_large() { .map(Ok::<_, Error>), ); - let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap(); - pin_mut!(sink); + let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap()); sink.send_all(&mut stream).await.unwrap(); let rows = sink.finish().await.unwrap(); assert_eq!(rows, 10_000); @@ -658,8 +655,7 @@ async fn copy_in_error() { .unwrap(); { - let sink = client.copy_in("COPY foo FROM STDIN").await.unwrap(); - pin_mut!(sink); + let mut sink = pin!(client.copy_in("COPY foo FROM STDIN").await.unwrap()); sink.send(Bytes::from_static(b"1\tsteven")).await.unwrap(); }