From f1729e46367f257607d9b1a9b246acf367514a2c Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Wed, 23 Dec 2020 21:03:15 -0500 Subject: [PATCH] deps: upgrade to tokio v1.0 ecosystem --- postgres-native-tls/Cargo.toml | 6 ++-- postgres-openssl/Cargo.toml | 6 ++-- postgres-openssl/src/lib.rs | 50 +++++++++++++++++++++++++----- postgres-protocol/Cargo.toml | 2 +- postgres-types/Cargo.toml | 2 +- postgres-types/src/serde_json_1.rs | 1 - postgres/Cargo.toml | 4 +-- postgres/src/copy_out_reader.rs | 2 +- postgres/src/notifications.rs | 9 +++--- tokio-postgres/Cargo.toml | 8 ++--- tokio-postgres/src/binary_copy.rs | 2 +- tokio-postgres/src/copy_in.rs | 1 - 12 files changed, 63 insertions(+), 30 deletions(-) diff --git a/postgres-native-tls/Cargo.toml b/postgres-native-tls/Cargo.toml index 81dcf29e3..598531ad9 100644 --- a/postgres-native-tls/Cargo.toml +++ b/postgres-native-tls/Cargo.toml @@ -18,10 +18,10 @@ runtime = ["tokio-postgres/runtime"] [dependencies] futures = "0.3" native-tls = "0.2" -tokio = "0.3" -tokio-native-tls = "0.2" +tokio = "1.0" +tokio-native-tls = "0.3" tokio-postgres = { version = "0.6.0", path = "../tokio-postgres", default-features = false } [dev-dependencies] -tokio = { version = "0.3", features = ["full"] } +tokio = { version = "1.0", features = ["full"] } postgres = { version = "0.18.0", path = "../postgres" } diff --git a/postgres-openssl/Cargo.toml b/postgres-openssl/Cargo.toml index c825e929d..11fd9f828 100644 --- a/postgres-openssl/Cargo.toml +++ b/postgres-openssl/Cargo.toml @@ -18,10 +18,10 @@ runtime = ["tokio-postgres/runtime"] [dependencies] futures = "0.3" openssl = "0.10" -tokio = "0.3" -tokio-openssl = "0.5" +tokio = "1.0" +tokio-openssl = "0.6" tokio-postgres = { version = "0.6.0", path = "../tokio-postgres", default-features = false } [dev-dependencies] -tokio = { version = "0.3", features = ["full"] } +tokio = { version = "1.0", features = ["full"] } postgres = { version = "0.18.0", path = "../postgres" } diff --git a/postgres-openssl/src/lib.rs b/postgres-openssl/src/lib.rs index 3780f2082..cad06d486 100644 --- a/postgres-openssl/src/lib.rs +++ b/postgres-openssl/src/lib.rs @@ -48,8 +48,10 @@ use openssl::hash::MessageDigest; use openssl::nid::Nid; #[cfg(feature = "runtime")] use openssl::ssl::SslConnector; -use openssl::ssl::{ConnectConfiguration, SslRef}; -use std::fmt::Debug; +use openssl::ssl::{self, ConnectConfiguration, SslRef}; +use openssl::x509::X509VerifyResult; +use std::error::Error; +use std::fmt::{self, Debug}; use std::future::Future; use std::io; use std::pin::Pin; @@ -57,7 +59,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio_openssl::{HandshakeError, SslStream}; +use tokio_openssl::SslStream; use tokio_postgres::tls; #[cfg(feature = "runtime")] use tokio_postgres::tls::MakeTlsConnect; @@ -131,23 +133,55 @@ impl TlsConnector { impl TlsConnect for TlsConnector where - S: AsyncRead + AsyncWrite + Unpin + Debug + 'static + Sync + Send, + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Stream = TlsStream; - type Error = HandshakeError; + type Error = Box; #[allow(clippy::type_complexity)] - type Future = Pin, HandshakeError>> + Send>>; + type Future = Pin, Self::Error>> + Send>>; fn connect(self, stream: S) -> Self::Future { let future = async move { - let stream = tokio_openssl::connect(self.ssl, &self.domain, stream).await?; - Ok(TlsStream(stream)) + let ssl = self.ssl.into_ssl(&self.domain)?; + let mut stream = SslStream::new(ssl, stream)?; + match Pin::new(&mut stream).connect().await { + Ok(()) => Ok(TlsStream(stream)), + Err(error) => Err(Box::new(ConnectError { + error, + verify_result: stream.ssl().verify_result(), + }) as _), + } }; Box::pin(future) } } +#[derive(Debug)] +struct ConnectError { + error: ssl::Error, + verify_result: X509VerifyResult, +} + +impl fmt::Display for ConnectError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.error, fmt)?; + + if self.verify_result != X509VerifyResult::OK { + fmt.write_str(": ")?; + fmt::Display::fmt(&self.verify_result, fmt)?; + } + + Ok(()) + } +} + +impl Error for ConnectError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&self.error) + } +} + /// The stream returned by `TlsConnector`. pub struct TlsStream(SslStream); diff --git a/postgres-protocol/Cargo.toml b/postgres-protocol/Cargo.toml index 3ac6acfb3..ed5bca864 100644 --- a/postgres-protocol/Cargo.toml +++ b/postgres-protocol/Cargo.toml @@ -11,7 +11,7 @@ readme = "../README.md" [dependencies] base64 = "0.13" byteorder = "1.0" -bytes = "0.5" +bytes = "1.0" fallible-iterator = "0.2" hmac = "0.10" md5 = "0.7" diff --git a/postgres-types/Cargo.toml b/postgres-types/Cargo.toml index c5e706f6e..00babcce2 100644 --- a/postgres-types/Cargo.toml +++ b/postgres-types/Cargo.toml @@ -22,7 +22,7 @@ with-uuid-0_8 = ["uuid-08"] with-time-0_2 = ["time-02"] [dependencies] -bytes = "0.5" +bytes = "1.0" fallible-iterator = "0.2" postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" } postgres-derive = { version = "0.4.0", optional = true, path = "../postgres-derive" } diff --git a/postgres-types/src/serde_json_1.rs b/postgres-types/src/serde_json_1.rs index e0fecb496..b98d561d1 100644 --- a/postgres-types/src/serde_json_1.rs +++ b/postgres-types/src/serde_json_1.rs @@ -1,5 +1,4 @@ use crate::{FromSql, IsNull, ToSql, Type}; -use bytes::buf::BufMutExt; use bytes::{BufMut, BytesMut}; use serde_1::{Deserialize, Serialize}; use serde_json_1::Value; diff --git a/postgres/Cargo.toml b/postgres/Cargo.toml index b84becd19..52ba17a47 100644 --- a/postgres/Cargo.toml +++ b/postgres/Cargo.toml @@ -31,12 +31,12 @@ with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"] with-time-0_2 = ["tokio-postgres/with-time-0_2"] [dependencies] -bytes = "0.5" +bytes = "1.0" fallible-iterator = "0.2" futures = "0.3" tokio-postgres = { version = "0.6.0", path = "../tokio-postgres" } -tokio = { version = "0.3", features = ["rt", "time"] } +tokio = { version = "1.0", features = ["rt", "time"] } log = "0.4" [dev-dependencies] diff --git a/postgres/src/copy_out_reader.rs b/postgres/src/copy_out_reader.rs index fd9c27fb0..e8b478d49 100644 --- a/postgres/src/copy_out_reader.rs +++ b/postgres/src/copy_out_reader.rs @@ -46,7 +46,7 @@ impl BufRead for CopyOutReader<'_> { }; } - Ok(self.cur.bytes()) + Ok(&self.cur) } fn consume(&mut self, amt: usize) { diff --git a/postgres/src/notifications.rs b/postgres/src/notifications.rs index 241c95a5d..ea44c31f8 100644 --- a/postgres/src/notifications.rs +++ b/postgres/src/notifications.rs @@ -4,6 +4,7 @@ use crate::connection::ConnectionRef; use crate::{Error, Notification}; use fallible_iterator::FallibleIterator; use futures::{ready, FutureExt}; +use std::pin::Pin; use std::task::Poll; use std::time::Duration; use tokio::time::{self, Instant, Sleep}; @@ -64,7 +65,7 @@ impl<'a> Notifications<'a> { /// This iterator may start returning `Some` after previously returning `None` if more notifications are received. pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> { TimeoutIter { - delay: self.connection.enter(|| time::sleep(timeout)), + delay: Box::pin(self.connection.enter(|| time::sleep(timeout))), timeout, connection: self.connection.as_ref(), } @@ -124,7 +125,7 @@ impl<'a> FallibleIterator for BlockingIter<'a> { /// A time-limited blocking iterator over pending notifications. pub struct TimeoutIter<'a> { connection: ConnectionRef<'a>, - delay: Sleep, + delay: Pin>, timeout: Duration, } @@ -134,7 +135,7 @@ impl<'a> FallibleIterator for TimeoutIter<'a> { fn next(&mut self) -> Result, Self::Error> { if let Some(notification) = self.connection.notifications_mut().pop_front() { - self.delay.reset(Instant::now() + self.timeout); + self.delay.as_mut().reset(Instant::now() + self.timeout); return Ok(Some(notification)); } @@ -143,7 +144,7 @@ impl<'a> FallibleIterator for TimeoutIter<'a> { self.connection.poll_block_on(|cx, notifications, done| { match notifications.pop_front() { Some(notification) => { - delay.reset(Instant::now() + timeout); + delay.as_mut().reset(Instant::now() + timeout); return Poll::Ready(Ok(Some(notification))); } None if done => return Poll::Ready(Ok(None)), diff --git a/tokio-postgres/Cargo.toml b/tokio-postgres/Cargo.toml index f659663e1..1c82f7da8 100644 --- a/tokio-postgres/Cargo.toml +++ b/tokio-postgres/Cargo.toml @@ -38,7 +38,7 @@ with-time-0_2 = ["postgres-types/with-time-0_2"] [dependencies] async-trait = "0.1" -bytes = "0.5" +bytes = "1.0" byteorder = "1.0" fallible-iterator = "0.2" futures = "0.3" @@ -50,11 +50,11 @@ phf = "0.8" postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" } postgres-types = { version = "0.1.2", path = "../postgres-types" } socket2 = "0.3" -tokio = { version = "0.3", features = ["io-util"] } -tokio-util = { version = "0.4", features = ["codec"] } +tokio = { version = "1.0", features = ["io-util"] } +tokio-util = { version = "0.6", features = ["codec"] } [dev-dependencies] -tokio = { version = "0.3", features = ["full"] } +tokio = { version = "1.0", features = ["full"] } env_logger = "0.8" criterion = "0.3" diff --git a/tokio-postgres/src/binary_copy.rs b/tokio-postgres/src/binary_copy.rs index 20064c728..3b1e13cd7 100644 --- a/tokio-postgres/src/binary_copy.rs +++ b/tokio-postgres/src/binary_copy.rs @@ -153,7 +153,7 @@ impl Stream for BinaryCopyOutStream { Some(header) => header.has_oids, None => { check_remaining(&chunk, HEADER_LEN)?; - if &chunk.bytes()[..MAGIC.len()] != MAGIC { + if !chunk.chunk().starts_with(MAGIC) { return Poll::Ready(Some(Err(Error::parse(io::Error::new( io::ErrorKind::InvalidData, "invalid magic value", diff --git a/tokio-postgres/src/copy_in.rs b/tokio-postgres/src/copy_in.rs index fc712f6db..bc90e5277 100644 --- a/tokio-postgres/src/copy_in.rs +++ b/tokio-postgres/src/copy_in.rs @@ -2,7 +2,6 @@ use crate::client::{InnerClient, Responses}; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::{query, slice_iter, Error, Statement}; -use bytes::buf::BufExt; use bytes::{Buf, BufMut, BytesMut}; use futures::channel::mpsc; use futures::future;