diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index 111bf94d804c..3e7d6fa29230 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -44,11 +44,11 @@ bytes = { version = "1", default-features = false } futures = { version = "0.3", default-features = false, features = ["alloc"] } once_cell = { version = "1", optional = true } paste = { version = "1.0" } -prost = { version = "0.12.3", default-features = false, features = ["prost-derive"] } +prost = { version = "0.13.1", default-features = false, features = ["prost-derive"] } # For Timestamp type -prost-types = { version = "0.12.3", default-features = false } +prost-types = { version = "0.13.1", default-features = false } tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] } -tonic = { version = "0.11.0", default-features = false, features = ["transport", "codegen", "prost"] } +tonic = { version = "0.12.0", default-features = false, features = ["transport", "codegen", "prost"] } # CLI-related dependencies anyhow = { version = "1.0", optional = true } @@ -70,8 +70,9 @@ cli = ["anyhow", "arrow-cast/prettyprint", "clap", "tracing-log", "tracing-subsc [dev-dependencies] arrow-cast = { workspace = true, features = ["prettyprint"] } assert_cmd = "2.0.8" -http = "0.2.9" -http-body = "0.4.5" +http = "1.1.0" +http-body = "1.0.0" +hyper-util = "0.1" pin-project-lite = "0.2" tempfile = "3.3" tokio-stream = { version = "0.1", features = ["net"] } diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs index 031628eaa833..d5168debc433 100644 --- a/arrow-flight/examples/flight_sql_server.rs +++ b/arrow-flight/examples/flight_sql_server.rs @@ -783,7 +783,8 @@ impl ProstMessageExt for FetchResults { #[cfg(test)] mod tests { use super::*; - use futures::TryStreamExt; + use futures::{TryFutureExt, TryStreamExt}; + use hyper_util::rt::TokioIo; use std::fs; use std::future::Future; use std::net::SocketAddr; @@ -843,7 +844,8 @@ mod tests { .serve_with_incoming(stream); let request_future = async { - let connector = service_fn(move |_| UnixStream::connect(path.clone())); + let connector = + service_fn(move |_| UnixStream::connect(path.clone()).map_ok(TokioIo::new)); let channel = Endpoint::try_from("http://example.com") .unwrap() .connect_with_connector(connector) diff --git a/arrow-flight/gen/Cargo.toml b/arrow-flight/gen/Cargo.toml index 7264a527ca8d..a12c683776b4 100644 --- a/arrow-flight/gen/Cargo.toml +++ b/arrow-flight/gen/Cargo.toml @@ -33,5 +33,5 @@ publish = false # Pin specific version of the tonic-build dependencies to avoid auto-generated # (and checked in) arrow.flight.protocol.rs from changing proc-macro2 = { version = "=1.0.86", default-features = false } -prost-build = { version = "=0.12.6", default-features = false } -tonic-build = { version = "=0.11.0", default-features = false, features = ["transport", "prost"] } +prost-build = { version = "=0.13.1", default-features = false } +tonic-build = { version = "=0.12.0", default-features = false, features = ["transport", "prost"] } diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index bc314de9d19f..8c7292894eab 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -38,7 +38,7 @@ pub struct BasicAuth { pub password: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Empty {} /// /// Describes an available action, including both the name used for execution @@ -103,7 +103,7 @@ pub struct Result { /// /// The result should be stored in Result.body. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CancelFlightInfoResult { #[prost(enumeration = "CancelStatus", tag = "1")] pub status: i32, @@ -1053,19 +1053,17 @@ pub mod flight_service_server { /// can expose a set of actions that are available. #[derive(Debug)] pub struct FlightServiceServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl FlightServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -1128,7 +1126,6 @@ pub mod flight_service_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/arrow.flight.protocol.FlightService/Handshake" => { #[allow(non_camel_case_types)] @@ -1162,7 +1159,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = HandshakeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1209,7 +1205,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ListFlightsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1255,7 +1250,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetFlightInfoSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1302,7 +1296,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = PollFlightInfoSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1348,7 +1341,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetSchemaSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1395,7 +1387,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = DoGetSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1442,7 +1433,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = DoPutSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1489,7 +1479,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = DoExchangeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1536,7 +1525,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = DoActionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1583,7 +1571,6 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ListActionsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1605,8 +1592,11 @@ pub mod flight_service_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -1627,16 +1617,6 @@ pub mod flight_service_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for FlightServiceServer { const NAME: &'static str = "arrow.flight.protocol.FlightService"; } diff --git a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs index c1f0fac0f6ba..5e6f198df75c 100644 --- a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs +++ b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs @@ -101,7 +101,7 @@ pub struct CommandGetSqlInfo { /// > /// The returned data should be ordered by data_type and then by type_name. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CommandGetXdbcTypeInfo { /// /// Specifies the data type to search for the info. @@ -121,7 +121,7 @@ pub struct CommandGetXdbcTypeInfo { /// > /// The returned data should be ordered by catalog_name. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CommandGetCatalogs {} /// /// Represents a request to retrieve the list of database schemas on a Flight SQL enabled backend. @@ -232,7 +232,7 @@ pub struct CommandGetTables { /// > /// The returned data should be ordered by table_type. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct CommandGetTableTypes {} /// /// Represents a request to retrieve the primary keys of a table on a Flight SQL enabled backend. @@ -511,7 +511,7 @@ pub struct ActionClosePreparedStatementRequest { /// Request message for the "BeginTransaction" action. /// Begins a transaction. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ActionBeginTransactionRequest {} /// /// Request message for the "BeginSavepoint" action. @@ -802,7 +802,7 @@ pub struct CommandPreparedStatementUpdate { /// CommandPreparedStatementUpdate was in the request, containing /// results from the update. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct DoPutUpdateResult { /// The number of records updated. A return value of -1 represents /// an unknown updated record count. @@ -862,7 +862,7 @@ pub struct ActionCancelQueryRequest { /// This command is deprecated since 13.0.0. Use the "CancelFlightInfo" /// action with DoAction instead. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ActionCancelQueryResult { #[prost(enumeration = "action_cancel_query_result::CancelResult", tag = "1")] pub result: i32, diff --git a/arrow-flight/tests/common/trailers_layer.rs b/arrow-flight/tests/common/trailers_layer.rs index b2ab74f7d925..0ccb7df86c74 100644 --- a/arrow-flight/tests/common/trailers_layer.rs +++ b/arrow-flight/tests/common/trailers_layer.rs @@ -21,7 +21,7 @@ use std::task::{Context, Poll}; use futures::ready; use http::{HeaderValue, Request, Response}; -use http_body::SizeHint; +use http_body::{Frame, SizeHint}; use pin_project_lite::pin_project; use tower::{Layer, Service}; @@ -99,31 +99,19 @@ impl http_body::Body for WrappedBody { type Data = B::Data; type Error = B::Error; - fn poll_data( - mut self: Pin<&mut Self>, + fn poll_frame( + self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - self.as_mut().project().inner.poll_data(cx) - } - - fn poll_trailers( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let result: Result, Self::Error> = - ready!(self.as_mut().project().inner.poll_trailers(cx)); - - let mut trailers = http::header::HeaderMap::new(); - trailers.insert("test-trailer", HeaderValue::from_static("trailer_val")); + ) -> Poll, Self::Error>>> { + let mut result = ready!(self.project().inner.poll_frame(cx)); - match result { - Ok(Some(mut existing)) => { - existing.extend(trailers.iter().map(|(k, v)| (k.clone(), v.clone()))); - Poll::Ready(Ok(Some(existing))) + if let Some(Ok(frame)) = &mut result { + if let Some(trailers) = frame.trailers_mut() { + trailers.insert("test-trailer", HeaderValue::from_static("trailer_val")); } - Ok(None) => Poll::Ready(Ok(Some(trailers))), - Err(e) => Poll::Ready(Err(e)), } + + Poll::Ready(result) } fn is_end_stream(&self) -> bool { diff --git a/arrow-integration-testing/Cargo.toml b/arrow-integration-testing/Cargo.toml index 032b99f4fbbb..7be56d919852 100644 --- a/arrow-integration-testing/Cargo.toml +++ b/arrow-integration-testing/Cargo.toml @@ -42,11 +42,11 @@ async-trait = { version = "0.1.41", default-features = false } clap = { version = "4", default-features = false, features = ["std", "derive", "help", "error-context", "usage"] } futures = { version = "0.3", default-features = false } hex = { version = "0.4", default-features = false, features = ["std"] } -prost = { version = "0.12", default-features = false } +prost = { version = "0.13", default-features = false } serde = { version = "1.0", default-features = false, features = ["rc", "derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } tokio = { version = "1.0", default-features = false } -tonic = { version = "0.11", default-features = false } +tonic = { version = "0.12", default-features = false } tracing-subscriber = { version = "0.3.1", default-features = false, features = ["fmt"], optional = true } num = { version = "0.4", default-features = false, features = ["std"] } flate2 = { version = "1", default-features = false, features = ["rust_backend"] }