From f8f775d3cee4fb8f6bd8b6316368dd4cfd8416de Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Tue, 20 Feb 2024 22:43:21 +1100 Subject: [PATCH] Update Flight proto: PollFlightInfo & expiration time --- arrow-flight/Cargo.toml | 4 +- arrow-flight/examples/flight_sql_server.rs | 2 + arrow-flight/examples/server.rs | 9 +- arrow-flight/src/arrow.flight.protocol.rs | 266 ++++++++++++++++++ arrow-flight/src/client.rs | 144 +++++++++- arrow-flight/src/lib.rs | 184 +++++++++++- .../src/sql/arrow.flight.protocol.sql.rs | 36 ++- arrow-flight/src/sql/client.rs | 1 + arrow-flight/src/sql/server.rs | 13 +- arrow-flight/tests/client.rs | 147 +++++++++- arrow-flight/tests/common/server.rs | 47 +++- .../src/flight_server_scenarios.rs | 2 + .../auth_basic_proto.rs | 10 +- .../integration_test.rs | 10 +- .../src/flight_server_scenarios/middleware.rs | 11 +- format/Flight.proto | 152 +++++++++- format/FlightSql.proto | 42 ++- 17 files changed, 1026 insertions(+), 54 deletions(-) diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index 81940b7600ee..1e7255baa748 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -44,7 +44,9 @@ bytes = { version = "1", default-features = false } futures = { version = "0.3", default-features = false, features = ["alloc"] } once_cell = { version = "1", optional = true } paste = { version = "1.0" } -prost = { version = "0.12.1", default-features = false, features = ["prost-derive"] } +prost = { version = "0.12.3", default-features = false, features = ["prost-derive"] } +# For Timestamp type +prost-types = { version = "0.12.3", default-features = false } tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] } tonic = { version = "0.11.0", default-features = false, features = ["transport", "codegen", "prost"] } diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs index bd94d3c499ca..85f5c7499346 100644 --- a/arrow-flight/examples/flight_sql_server.rs +++ b/arrow-flight/examples/flight_sql_server.rs @@ -249,6 +249,8 @@ impl FlightSqlService for FlightSqlServiceImpl { let endpoint = FlightEndpoint { ticket: Some(ticket), location: vec![loc], + expiration_time: None, + app_metadata: vec![].into(), }; let info = FlightInfo::new() .try_with_schema(&schema) diff --git a/arrow-flight/examples/server.rs b/arrow-flight/examples/server.rs index 85ac4ca1384c..8c766b075957 100644 --- a/arrow-flight/examples/server.rs +++ b/arrow-flight/examples/server.rs @@ -22,7 +22,7 @@ use tonic::{Request, Response, Status, Streaming}; use arrow_flight::{ flight_service_server::FlightService, flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, - HandshakeResponse, PutResult, SchemaResult, Ticket, + HandshakeResponse, PollInfo, PutResult, SchemaResult, Ticket, }; #[derive(Clone)] @@ -59,6 +59,13 @@ impl FlightService for FlightServiceImpl { Err(Status::unimplemented("Implement get_flight_info")) } + async fn poll_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement poll_flight_info")) + } + async fn get_schema( &self, _request: Request, diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index e76013bd7c5f..67265e781786 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -70,6 +70,26 @@ pub struct Action { pub body: ::prost::bytes::Bytes, } /// +/// The request of the CancelFlightInfo action. +/// +/// The request should be stored in Action.body. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CancelFlightInfoRequest { + #[prost(message, optional, tag = "1")] + pub info: ::core::option::Option, +} +/// +/// The request of the RenewFlightEndpoint action. +/// +/// The request should be stored in Action.body. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RenewFlightEndpointRequest { + #[prost(message, optional, tag = "1")] + pub endpoint: ::core::option::Option, +} +/// /// An opaque result returned after executing an action. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -78,6 +98,16 @@ pub struct Result { pub body: ::prost::bytes::Bytes, } /// +/// The result of the CancelFlightInfo action. +/// +/// The result should be stored in Result.body. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CancelFlightInfoResult { + #[prost(enumeration = "CancelStatus", tag = "1")] + pub status: i32, +} +/// /// Wrap the result of a getSchema call #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -209,6 +239,57 @@ pub struct FlightInfo { /// FlightEndpoints are in the same order as the data. #[prost(bool, tag = "6")] pub ordered: bool, + /// + /// Application-defined metadata. + /// + /// There is no inherent or required relationship between this + /// and the app_metadata fields in the FlightEndpoints or resulting + /// FlightData messages. Since this metadata is application-defined, + /// a given application could define there to be a relationship, + /// but there is none required by the spec. + #[prost(bytes = "bytes", tag = "7")] + pub app_metadata: ::prost::bytes::Bytes, +} +/// +/// The information to process a long-running query. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PollInfo { + /// + /// The currently available results. + /// + /// If "flight_descriptor" is not specified, the query is complete + /// and "info" specifies all results. Otherwise, "info" contains + /// partial query results. + /// + /// Note that each PollInfo response contains a complete + /// FlightInfo (not just the delta between the previous and current + /// FlightInfo). + /// + /// Subsequent PollInfo responses may only append new endpoints to + /// info. + /// + /// Clients can begin fetching results via DoGet(Ticket) with the + /// ticket in the info before the query is + /// completed. FlightInfo.ordered is also valid. + #[prost(message, optional, tag = "1")] + pub info: ::core::option::Option, + /// + /// The descriptor the client should use on the next try. + /// If unset, the query is complete. + #[prost(message, optional, tag = "2")] + pub flight_descriptor: ::core::option::Option, + /// + /// Query progress. If known, must be in \[0.0, 1.0\] but need not be + /// monotonic or nondecreasing. If unknown, do not set. + #[prost(double, optional, tag = "3")] + pub progress: ::core::option::Option, + /// + /// Expiration time for this request. After this passes, the server + /// might not accept the retry descriptor anymore (and the query may + /// be cancelled). This may be updated on a call to PollFlightInfo. + #[prost(message, optional, tag = "4")] + pub expiration_time: ::core::option::Option<::prost_types::Timestamp>, } /// /// A particular stream or split associated with a flight. @@ -236,6 +317,22 @@ pub struct FlightEndpoint { /// represent redundant and/or load balanced services. #[prost(message, repeated, tag = "2")] pub location: ::prost::alloc::vec::Vec, + /// + /// Expiration time of this stream. If present, clients may assume + /// they can retry DoGet requests. Otherwise, it is + /// application-defined whether DoGet requests may be retried. + #[prost(message, optional, tag = "3")] + pub expiration_time: ::core::option::Option<::prost_types::Timestamp>, + /// + /// Application-defined metadata. + /// + /// There is no inherent or required relationship between this + /// and the app_metadata fields in the FlightInfo or resulting + /// FlightData messages. Since this metadata is application-defined, + /// a given application could define there to be a relationship, + /// but there is none required by the spec. + #[prost(bytes = "bytes", tag = "4")] + pub app_metadata: ::prost::bytes::Bytes, } /// /// A location where a Flight service will accept retrieval of a particular @@ -292,6 +389,51 @@ pub struct PutResult { #[prost(bytes = "bytes", tag = "1")] pub app_metadata: ::prost::bytes::Bytes, } +/// +/// The result of a cancel operation. +/// +/// This is used by CancelFlightInfoResult.status. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum CancelStatus { + /// The cancellation status is unknown. Servers should avoid using + /// this value (send a NOT_FOUND error if the requested query is + /// not known). Clients can retry the request. + Unspecified = 0, + /// The cancellation request is complete. Subsequent requests with + /// the same payload may return CANCELLED or a NOT_FOUND error. + Cancelled = 1, + /// The cancellation request is in progress. The client may retry + /// the cancellation request. + Cancelling = 2, + /// The query is not cancellable. The client should not retry the + /// cancellation request. + NotCancellable = 3, +} +impl CancelStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + CancelStatus::Unspecified => "CANCEL_STATUS_UNSPECIFIED", + CancelStatus::Cancelled => "CANCEL_STATUS_CANCELLED", + CancelStatus::Cancelling => "CANCEL_STATUS_CANCELLING", + CancelStatus::NotCancellable => "CANCEL_STATUS_NOT_CANCELLABLE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "CANCEL_STATUS_UNSPECIFIED" => Some(Self::Unspecified), + "CANCEL_STATUS_CANCELLED" => Some(Self::Cancelled), + "CANCEL_STATUS_CANCELLING" => Some(Self::Cancelling), + "CANCEL_STATUS_NOT_CANCELLABLE" => Some(Self::NotCancellable), + _ => None, + } + } +} /// Generated client implementations. pub mod flight_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] @@ -487,6 +629,56 @@ pub mod flight_service_client { self.inner.unary(req, path, codec).await } /// + /// For a given FlightDescriptor, start a query and get information + /// to poll its execution status. This is a useful interface if the + /// query may be a long-running query. The first PollFlightInfo call + /// should return as quickly as possible. (GetFlightInfo doesn't + /// return until the query is complete.) + /// + /// A client can consume any available results before + /// the query is completed. See PollInfo.info for details. + /// + /// A client can poll the updated query status by calling + /// PollFlightInfo() with PollInfo.flight_descriptor. A server + /// should not respond until the result would be different from last + /// time. That way, the client can "long poll" for updates + /// without constantly making requests. Clients can set a short timeout + /// to avoid blocking calls if desired. + /// + /// A client can't use PollInfo.flight_descriptor after + /// PollInfo.expiration_time passes. A server might not accept the + /// retry descriptor anymore and the query may be cancelled. + /// + /// A client may use the CancelFlightInfo action with + /// PollInfo.info to cancel the running query. + pub async fn poll_flight_info( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/arrow.flight.protocol.FlightService/PollFlightInfo", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "arrow.flight.protocol.FlightService", + "PollFlightInfo", + ), + ); + self.inner.unary(req, path, codec).await + } + /// /// For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema /// This is used when a consumer needs the Schema of flight stream. Similar to /// GetFlightInfo this interface may generate a new flight that was not previously @@ -735,6 +927,33 @@ pub mod flight_service_server { request: tonic::Request, ) -> std::result::Result, tonic::Status>; /// + /// For a given FlightDescriptor, start a query and get information + /// to poll its execution status. This is a useful interface if the + /// query may be a long-running query. The first PollFlightInfo call + /// should return as quickly as possible. (GetFlightInfo doesn't + /// return until the query is complete.) + /// + /// A client can consume any available results before + /// the query is completed. See PollInfo.info for details. + /// + /// A client can poll the updated query status by calling + /// PollFlightInfo() with PollInfo.flight_descriptor. A server + /// should not respond until the result would be different from last + /// time. That way, the client can "long poll" for updates + /// without constantly making requests. Clients can set a short timeout + /// to avoid blocking calls if desired. + /// + /// A client can't use PollInfo.flight_descriptor after + /// PollInfo.expiration_time passes. A server might not accept the + /// retry descriptor anymore and the query may be cancelled. + /// + /// A client may use the CancelFlightInfo action with + /// PollInfo.info to cancel the running query. + async fn poll_flight_info( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// /// For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema /// This is used when a consumer needs the Schema of flight stream. Similar to /// GetFlightInfo this interface may generate a new flight that was not previously @@ -1052,6 +1271,53 @@ pub mod flight_service_server { }; Box::pin(fut) } + "/arrow.flight.protocol.FlightService/PollFlightInfo" => { + #[allow(non_camel_case_types)] + struct PollFlightInfoSvc(pub Arc); + impl< + T: FlightService, + > tonic::server::UnaryService + for PollFlightInfoSvc { + type Response = super::PollInfo; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::poll_flight_info(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PollFlightInfoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/arrow.flight.protocol.FlightService/GetSchema" => { #[allow(non_camel_case_types)] struct GetSchemaSvc(pub Arc); diff --git a/arrow-flight/src/client.rs b/arrow-flight/src/client.rs index a264012c82ec..b2abfb0c17b2 100644 --- a/arrow-flight/src/client.rs +++ b/arrow-flight/src/client.rs @@ -18,9 +18,12 @@ use std::task::Poll; use crate::{ - decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient, - trailers::extract_lazy_trailers, Action, ActionType, Criteria, Empty, FlightData, - FlightDescriptor, FlightInfo, HandshakeRequest, PutResult, Ticket, + decode::FlightRecordBatchStream, + flight_service_client::FlightServiceClient, + gen::{CancelFlightInfoRequest, CancelFlightInfoResult, RenewFlightEndpointRequest}, + trailers::extract_lazy_trailers, + Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, + HandshakeRequest, PollInfo, PutResult, Ticket, }; use arrow_schema::Schema; use bytes::Bytes; @@ -30,6 +33,7 @@ use futures::{ stream::{self, BoxStream}, FutureExt, Stream, StreamExt, TryStreamExt, }; +use prost::Message; use tonic::{metadata::MetadataMap, transport::Channel}; use crate::error::{FlightError, Result}; @@ -256,6 +260,64 @@ impl FlightClient { Ok(response) } + /// Make a `PollFlightInfo` call to the server with the provided + /// [`FlightDescriptor`] and return the [`PollInfo`] from the + /// server. + /// + /// The `info` field of the [`PollInfo`] can be used with + /// [`Self::do_get`] to retrieve the requested batches. + /// + /// If the `flight_descriptor` field of the [`PollInfo`] is + /// `None` then the `info` field represents the complete results. + /// + /// If the `flight_descriptor` field is some [`FlightDescriptor`] + /// then the `info` field has incomplete results, and the client + /// should call this method again with the new `flight_descriptor` + /// to get the updated status. + /// + /// The `expiration_time`, if set, represents the expiration time + /// of the `flight_descriptor`, after which the server may not accept + /// this retry descriptor and may cancel the query. + /// + /// # Example: + /// ```no_run + /// # async fn run() { + /// # use arrow_flight::FlightClient; + /// # use arrow_flight::FlightDescriptor; + /// # let channel: tonic::transport::Channel = unimplemented!(); + /// let mut client = FlightClient::new(channel); + /// + /// // Send a 'CMD' request to the server + /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec()); + /// let poll_info = client + /// .poll_flight_info(request) + /// .await + /// .expect("error handshaking"); + /// + /// // retrieve the first endpoint from the returned poll info + /// let ticket = poll_info + /// .info + /// .expect("expected flight info") + /// .endpoint[0] + /// // Extract the ticket + /// .ticket + /// .clone() + /// .expect("expected ticket"); + /// + /// // Retrieve the corresponding RecordBatch stream with do_get + /// let data = client + /// .do_get(ticket) + /// .await + /// .expect("error fetching data"); + /// # } + /// ``` + pub async fn poll_flight_info(&mut self, descriptor: FlightDescriptor) -> Result { + let request = self.make_request(descriptor); + + let response = self.inner.poll_flight_info(request).await?.into_inner(); + Ok(response) + } + /// Make a `DoPut` call to the server with the provided /// [`Stream`] of [`FlightData`] and returning a /// stream of [`PutResult`]. @@ -540,6 +602,82 @@ impl FlightClient { Ok(result_stream.boxed()) } + /// Make a `CancelFlightInfo` call to the server and return + /// a [`CancelFlightInfoResult`]. + /// + /// # Example: + /// ```no_run + /// # async fn run() { + /// # use arrow_flight::{CancelFlightInfoRequest, FlightClient, FlightDescriptor}; + /// # let channel: tonic::transport::Channel = unimplemented!(); + /// let mut client = FlightClient::new(channel); + /// + /// // Send a 'CMD' request to the server + /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec()); + /// let flight_info = client + /// .get_flight_info(request) + /// .await + /// .expect("error handshaking"); + /// + /// // Cancel the query + /// let request = CancelFlightInfoRequest::new(flight_info); + /// let result = client + /// .cancel_flight_info(request) + /// .await + /// .expect("error cancelling"); + /// # } + /// ``` + pub async fn cancel_flight_info( + &mut self, + request: CancelFlightInfoRequest, + ) -> Result { + let action = Action::new("CancelFlightInfo", request.encode_to_vec()); + let response = self.do_action(action).await?.try_next().await?; + let response = response.ok_or(FlightError::protocol( + "Received no response for cancel_flight_info call", + ))?; + CancelFlightInfoResult::decode(response) + .map_err(|e| FlightError::DecodeError(e.to_string())) + } + + /// Make a `RenewFlightEndpoint` call to the server and return + /// the renewed [`FlightEndpoint`]. + /// + /// # Example: + /// ```no_run + /// # async fn run() { + /// # use arrow_flight::{FlightClient, FlightDescriptor, RenewFlightEndpointRequest}; + /// # let channel: tonic::transport::Channel = unimplemented!(); + /// let mut client = FlightClient::new(channel); + /// + /// // Send a 'CMD' request to the server + /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec()); + /// let flight_endpoint = client + /// .get_flight_info(request) + /// .await + /// .expect("error handshaking") + /// .endpoint[0]; + /// + /// // Renew the endpoint + /// let request = RenewFlightEndpointRequest::new(flight_endpoint); + /// let flight_endpoint = client + /// .renew_flight_endpoint(request) + /// .await + /// .expect("error renewing"); + /// # } + /// ``` + pub async fn renew_flight_endpoint( + &mut self, + request: RenewFlightEndpointRequest, + ) -> Result { + let action = Action::new("RenewFlightEndpoint", request.encode_to_vec()); + let response = self.do_action(action).await?.try_next().await?; + let response = response.ok_or(FlightError::protocol( + "Received no response for renew_flight_endpoint call", + ))?; + FlightEndpoint::decode(response).map_err(|e| FlightError::DecodeError(e.to_string())) + } + /// return a Request, adding any configured metadata fn make_request(&self, t: T) -> tonic::Request { // Pass along metadata diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs index 8d05f658703a..434d19ce76fe 100644 --- a/arrow-flight/src/lib.rs +++ b/arrow-flight/src/lib.rs @@ -45,6 +45,7 @@ use arrow_ipc::convert::try_schema_from_ipc_buffer; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::Bytes; +use prost_types::Timestamp; use std::{ convert::{TryFrom, TryInto}, fmt, @@ -97,6 +98,9 @@ pub mod error; pub use gen::Action; pub use gen::ActionType; pub use gen::BasicAuth; +pub use gen::CancelFlightInfoRequest; +pub use gen::CancelFlightInfoResult; +pub use gen::CancelStatus; pub use gen::Criteria; pub use gen::Empty; pub use gen::FlightData; @@ -106,7 +110,9 @@ pub use gen::FlightInfo; pub use gen::HandshakeRequest; pub use gen::HandshakeResponse; pub use gen::Location; +pub use gen::PollInfo; pub use gen::PutResult; +pub use gen::RenewFlightEndpointRequest; pub use gen::Result; pub use gen::SchemaResult; pub use gen::Ticket; @@ -225,7 +231,7 @@ impl fmt::Display for FlightEndpoint { write!(f, " ticket: ")?; match &self.ticket { Some(value) => write!(f, "{value}"), - None => write!(f, " none"), + None => write!(f, " None"), }?; write!(f, ", location: [")?; let mut sep = ""; @@ -234,6 +240,13 @@ impl fmt::Display for FlightEndpoint { sep = ", "; } write!(f, "]")?; + write!(f, ", expiration_time:")?; + match &self.expiration_time { + Some(value) => write!(f, " {value}"), + None => write!(f, " None"), + }?; + write!(f, ", app_metadata: ")?; + limited_fmt(f, &self.app_metadata, 8)?; write!(f, " }}") } } @@ -257,6 +270,68 @@ impl fmt::Display for FlightInfo { } write!(f, "], total_records: {}", self.total_records)?; write!(f, ", total_bytes: {}", self.total_bytes)?; + write!(f, ", ordered: {}", self.ordered)?; + write!(f, ", app_metadata: ")?; + limited_fmt(f, &self.app_metadata, 8)?; + write!(f, " }}") + } +} + +impl fmt::Display for PollInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "PollInfo {{")?; + write!(f, " info:")?; + match &self.info { + Some(value) => write!(f, " {value}"), + None => write!(f, " None"), + }?; + write!(f, ", descriptor:")?; + match &self.flight_descriptor { + Some(d) => write!(f, " {d}"), + None => write!(f, " None"), + }?; + write!(f, ", progress:")?; + match &self.progress { + Some(value) => write!(f, " {value}"), + None => write!(f, " None"), + }?; + write!(f, ", expiration_time:")?; + match &self.expiration_time { + Some(value) => write!(f, " {value}"), + None => write!(f, " None"), + }?; + write!(f, " }}") + } +} + +impl fmt::Display for CancelFlightInfoRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CancelFlightInfoRequest {{")?; + write!(f, " info: ")?; + match &self.info { + Some(value) => write!(f, "{value}")?, + None => write!(f, "None")?, + }; + write!(f, " }}") + } +} + +impl fmt::Display for CancelFlightInfoResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CancelFlightInfoResult {{")?; + write!(f, " status: {}", self.status().as_str_name())?; + write!(f, " }}") + } +} + +impl fmt::Display for RenewFlightEndpointRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "RenewFlightEndpointRequest {{")?; + write!(f, " endpoint: ")?; + match &self.endpoint { + Some(value) => write!(f, "{value}")?, + None => write!(f, "None")?, + }; write!(f, " }}") } } @@ -472,9 +547,6 @@ impl FlightInfo { /// // Encode the Arrow schema /// .try_with_schema(&get_schema()) /// .expect("encoding failed") - /// .with_descriptor( - /// FlightDescriptor::new_cmd("a command") - /// ) /// .with_endpoint( /// FlightEndpoint::new() /// .with_ticket(Ticket::new("ticket contents") @@ -493,6 +565,7 @@ impl FlightInfo { // https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L287-L289 total_records: -1, total_bytes: -1, + app_metadata: Bytes::new(), } } @@ -546,6 +619,70 @@ impl FlightInfo { self.ordered = ordered; self } + + /// Add optional application specific metadata to the message + pub fn with_app_metadata(mut self, app_metadata: impl Into) -> Self { + self.app_metadata = app_metadata.into(); + self + } +} + +impl PollInfo { + /// Create a new, empty [`PollInfo`], providing information for a long-running query + /// + /// # Example: + /// ``` + /// # use arrow_flight::{FlightInfo, PollInfo, FlightDescriptor}; + /// # use prost_types::Timestamp; + /// // Create a new PollInfo + /// let poll_info = PollInfo::new() + /// .with_info(FlightInfo::new()) + /// .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY")) + /// .try_with_progress(0.5) + /// .expect("progress should've been valid") + /// .with_expiration_time( + /// "1970-01-01".parse().expect("invalid timestamp") + /// ); + /// ``` + pub fn new() -> Self { + Self { + info: None, + flight_descriptor: None, + progress: None, + expiration_time: None, + } + } + + /// Add the current available results for the poll call as a [`FlightInfo`] + pub fn with_info(mut self, info: FlightInfo) -> Self { + self.info = Some(info); + self + } + + /// Add a [`FlightDescriptor`] that the client should use for the next poll call, + /// if the query is not yet complete + pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self { + self.flight_descriptor = Some(flight_descriptor); + self + } + + /// Set the query progress if known. Must be in the range [0.0, 1.0] else this will + /// return an error + pub fn try_with_progress(mut self, progress: f64) -> ArrowResult { + if !(0.0..=1.0).contains(&progress) { + return Err(ArrowError::InvalidArgumentError(format!( + "PollInfo progress must be in the range [0.0, 1.0], got {progress}" + ))); + } + self.progress = Some(progress); + Ok(self) + } + + /// Specify expiration time for this request + pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self { + self.expiration_time = Some(expiration_time); + self + } } impl<'a> SchemaAsIpc<'a> { @@ -556,6 +693,33 @@ impl<'a> SchemaAsIpc<'a> { } } +impl CancelFlightInfoRequest { + /// Create a new [`CancelFlightInfoRequest`], providing the [`FlightInfo`] + /// of the query to cancel. + pub fn new(info: FlightInfo) -> Self { + Self { info: Some(info) } + } +} + +impl CancelFlightInfoResult { + /// Create a new [`CancelFlightInfoResult`] from the provided [`CancelStatus`]. + pub fn new(status: CancelStatus) -> Self { + Self { + status: status as i32, + } + } +} + +impl RenewFlightEndpointRequest { + /// Create a new [`RenewFlightEndpointRequest`], providing the [`FlightEndpoint`] + /// for which is being requested an extension of its expiration. + pub fn new(endpoint: FlightEndpoint) -> Self { + Self { + endpoint: Some(endpoint), + } + } +} + impl Action { /// Create a new Action with type and body pub fn new(action_type: impl Into, body: impl Into) -> Self { @@ -633,6 +797,18 @@ impl FlightEndpoint { self.location.push(Location { uri: uri.into() }); self } + + /// Specify expiration time for this stream + pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self { + self.expiration_time = Some(expiration_time); + self + } + + /// Add optional application specific metadata to the message + pub fn with_app_metadata(mut self, app_metadata: impl Into) -> Self { + self.app_metadata = app_metadata.into(); + self + } } #[cfg(test)] diff --git a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs index c7c23311e61e..2b2f4af7ac90 100644 --- a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs +++ b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs @@ -176,7 +176,7 @@ pub struct CommandGetDbSchemas { /// - ARROW:FLIGHT:SQL:PRECISION - Column precision/size /// - ARROW:FLIGHT:SQL:SCALE - Column scale/decimal digits if applicable /// - ARROW:FLIGHT:SQL:IS_AUTO_INCREMENT - "1" indicates if the column is auto incremented, "0" otherwise. -/// - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case sensitive, "0" otherwise. +/// - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case-sensitive, "0" otherwise. /// - ARROW:FLIGHT:SQL:IS_READ_ONLY - "1" indicates if the column is read only, "0" otherwise. /// - ARROW:FLIGHT:SQL:IS_SEARCHABLE - "1" indicates if the column is searchable via WHERE clause, "0" otherwise. /// The returned data should be ordered by catalog_name, db_schema_name, table_name, then table_type, followed by table_schema if requested. @@ -485,11 +485,14 @@ pub struct ActionCreatePreparedStatementResult { #[prost(bytes = "bytes", tag = "1")] pub prepared_statement_handle: ::prost::bytes::Bytes, /// If a result set generating query was provided, dataset_schema contains the - /// schema of the dataset as described in Schema.fbs::Schema, it is serialized as an IPC message. + /// schema of the result set. It should be an IPC-encapsulated Schema, as described in Schema.fbs. + /// For some queries, the schema of the results may depend on the schema of the parameters. The server + /// should provide its best guess as to the schema at this point. Clients must not assume that this + /// schema, if provided, will be accurate. #[prost(bytes = "bytes", tag = "2")] pub dataset_schema: ::prost::bytes::Bytes, /// If the query provided contained parameters, parameter_schema contains the - /// schema of the expected parameters as described in Schema.fbs::Schema, it is serialized as an IPC message. + /// schema of the expected parameters. It should be an IPC-encapsulated Schema, as described in Schema.fbs. #[prost(bytes = "bytes", tag = "3")] pub parameter_schema: ::prost::bytes::Bytes, } @@ -691,7 +694,7 @@ pub mod action_end_savepoint_request { /// - ARROW:FLIGHT:SQL:PRECISION - Column precision/size /// - ARROW:FLIGHT:SQL:SCALE - Column scale/decimal digits if applicable /// - ARROW:FLIGHT:SQL:IS_AUTO_INCREMENT - "1" indicates if the column is auto incremented, "0" otherwise. -/// - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case sensitive, "0" otherwise. +/// - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case-sensitive, "0" otherwise. /// - ARROW:FLIGHT:SQL:IS_READ_ONLY - "1" indicates if the column is read only, "0" otherwise. /// - ARROW:FLIGHT:SQL:IS_SEARCHABLE - "1" indicates if the column is searchable via WHERE clause, "0" otherwise. /// - GetFlightInfo: execute the query. @@ -717,7 +720,7 @@ pub struct CommandStatementQuery { /// - ARROW:FLIGHT:SQL:PRECISION - Column precision/size /// - ARROW:FLIGHT:SQL:SCALE - Column scale/decimal digits if applicable /// - ARROW:FLIGHT:SQL:IS_AUTO_INCREMENT - "1" indicates if the column is auto incremented, "0" otherwise. -/// - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case sensitive, "0" otherwise. +/// - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case-sensitive, "0" otherwise. /// - ARROW:FLIGHT:SQL:IS_READ_ONLY - "1" indicates if the column is read only, "0" otherwise. /// - ARROW:FLIGHT:SQL:IS_SEARCHABLE - "1" indicates if the column is searchable via WHERE clause, "0" otherwise. /// - GetFlightInfo: execute the query. @@ -754,9 +757,12 @@ pub struct TicketStatementQuery { /// - ARROW:FLIGHT:SQL:PRECISION - Column precision/size /// - ARROW:FLIGHT:SQL:SCALE - Column scale/decimal digits if applicable /// - ARROW:FLIGHT:SQL:IS_AUTO_INCREMENT - "1" indicates if the column is auto incremented, "0" otherwise. -/// - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case sensitive, "0" otherwise. +/// - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case-sensitive, "0" otherwise. /// - ARROW:FLIGHT:SQL:IS_READ_ONLY - "1" indicates if the column is read only, "0" otherwise. /// - ARROW:FLIGHT:SQL:IS_SEARCHABLE - "1" indicates if the column is searchable via WHERE clause, "0" otherwise. +/// +/// If the schema is retrieved after parameter values have been bound with DoPut, then the server should account +/// for the parameters when determining the schema. /// - DoPut: bind parameter values. All of the bound parameter sets will be executed as a single atomic execution. /// - GetFlightInfo: execute the prepared statement instance. #[allow(clippy::derive_partial_eq_without_eq)] @@ -768,7 +774,7 @@ pub struct CommandPreparedStatementQuery { } /// /// Represents a SQL update query. Used in the command member of FlightDescriptor -/// for the the RPC call DoPut to cause the server to execute the included SQL update. +/// for the RPC call DoPut to cause the server to execute the included SQL update. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CommandStatementUpdate { @@ -781,7 +787,7 @@ pub struct CommandStatementUpdate { } /// /// Represents a SQL update query. Used in the command member of FlightDescriptor -/// for the the RPC call DoPut to cause the server to execute the included +/// for the RPC call DoPut to cause the server to execute the included /// prepared statement handle as an update. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -815,6 +821,9 @@ pub struct DoPutUpdateResult { /// data. /// /// This command is idempotent. +/// +/// This command is deprecated since 13.0.0. Use the "CancelFlightInfo" +/// action with DoAction instead. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ActionCancelQueryRequest { @@ -829,6 +838,9 @@ pub struct ActionCancelQueryRequest { /// The result of cancelling a query. /// /// The result should be wrapped in a google.protobuf.Any message. +/// +/// This command is deprecated since 13.0.0. Use the "CancelFlightInfo" +/// action with DoAction instead. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ActionCancelQueryResult { @@ -1272,7 +1284,7 @@ pub enum SqlInfo { SqlMaxCharLiteralLength = 542, /// Retrieves a int64 value representing the maximum number of characters allowed for a column name. SqlMaxColumnNameLength = 543, - /// Retrieves a int64 value representing the the maximum number of columns allowed in a GROUP BY clause. + /// Retrieves a int64 value representing the maximum number of columns allowed in a GROUP BY clause. SqlMaxColumnsInGroupBy = 544, /// Retrieves a int64 value representing the maximum number of columns allowed in an index. SqlMaxColumnsInIndex = 545, @@ -2373,7 +2385,7 @@ impl SqlSupportsConvert { } /// * /// The JDBC/ODBC-defined type of any object. -/// All the values here are the sames as in the JDBC and ODBC specs. +/// All the values here are the same as in the JDBC and ODBC specs. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum XdbcDataType { @@ -2621,7 +2633,7 @@ pub enum Nullable { /// Indicates that the fields allow the use of null values. NullabilityNullable = 1, /// * - /// Indicates that nullability of the fields can not be determined. + /// Indicates that nullability of the fields cannot be determined. NullabilityUnknown = 2, } impl Nullable { @@ -2650,7 +2662,7 @@ impl Nullable { #[repr(i32)] pub enum Searchable { /// * - /// Indicates that column can not be used in a WHERE clause. + /// Indicates that column cannot be used in a WHERE clause. None = 0, /// * /// Indicates that the column can be used in a WHERE clause if it is using a diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs index 6448e69e7f30..a014137f6fa9 100644 --- a/arrow-flight/src/sql/client.rs +++ b/arrow-flight/src/sql/client.rs @@ -391,6 +391,7 @@ impl FlightSqlServiceClient { /// Explicitly shut down and clean up the client. pub async fn close(&mut self) -> Result<(), ArrowError> { + // TODO: consume self instead of &mut self to explicitly prevent reuse? Ok(()) } diff --git a/arrow-flight/src/sql/server.rs b/arrow-flight/src/sql/server.rs index f1656aca882a..0431e58111a4 100644 --- a/arrow-flight/src/sql/server.rs +++ b/arrow-flight/src/sql/server.rs @@ -36,9 +36,9 @@ use super::{ DoPutUpdateResult, ProstMessageExt, SqlInfo, TicketStatementQuery, }; use crate::{ - flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, - FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, - Ticket, + flight_service_server::FlightService, gen::PollInfo, Action, ActionType, Criteria, Empty, + FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, + SchemaResult, Ticket, }; pub(crate) static CREATE_PREPARED_STATEMENT: &str = "CreatePreparedStatement"; @@ -632,6 +632,13 @@ where } } + async fn poll_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + async fn get_schema( &self, _request: Request, diff --git a/arrow-flight/tests/client.rs b/arrow-flight/tests/client.rs index 3ad9ee7a45ca..9e19bce92338 100644 --- a/arrow-flight/tests/client.rs +++ b/arrow-flight/tests/client.rs @@ -24,13 +24,15 @@ mod common { use arrow_array::{RecordBatch, UInt64Array}; use arrow_flight::{ decode::FlightRecordBatchStream, encode::FlightDataEncoderBuilder, error::FlightError, Action, - ActionType, Criteria, Empty, FlightClient, FlightData, FlightDescriptor, FlightInfo, - HandshakeRequest, HandshakeResponse, PutResult, Ticket, + ActionType, CancelFlightInfoRequest, CancelFlightInfoResult, CancelStatus, Criteria, Empty, + FlightClient, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, + HandshakeResponse, PollInfo, PutResult, RenewFlightEndpointRequest, Ticket, }; use arrow_schema::{DataType, Field, Schema}; use bytes::Bytes; use common::{server::TestFlightServer, trailers_layer::TrailersLayer}; use futures::{Future, StreamExt, TryStreamExt}; +use prost::Message; use tokio::{net::TcpListener, task::JoinHandle}; use tonic::{ transport::{Channel, Uri}, @@ -106,6 +108,7 @@ fn test_flight_info(request: &FlightDescriptor) -> FlightInfo { total_bytes: 123, total_records: 456, ordered: false, + app_metadata: Bytes::new(), } } @@ -141,6 +144,47 @@ async fn test_get_flight_info_error() { .await; } +fn test_poll_info(request: &FlightDescriptor) -> PollInfo { + PollInfo { + info: Some(test_flight_info(request)), + flight_descriptor: None, + progress: Some(1.0), + expiration_time: None, + } +} + +#[tokio::test] +async fn test_poll_flight_info() { + do_test(|test_server, mut client| async move { + client.add_header("foo-header", "bar-header-value").unwrap(); + let request = FlightDescriptor::new_cmd(b"My Command".to_vec()); + + let expected_response = test_poll_info(&request); + test_server.set_poll_flight_info_response(Ok(expected_response.clone())); + + let response = client.poll_flight_info(request.clone()).await.unwrap(); + + assert_eq!(response, expected_response); + assert_eq!(test_server.take_poll_flight_info_request(), Some(request)); + ensure_metadata(&client, &test_server); + }) + .await; +} + +#[tokio::test] +async fn test_poll_flight_info_error() { + do_test(|test_server, mut client| async move { + let request = FlightDescriptor::new_cmd(b"My Command".to_vec()); + + let e = Status::unauthenticated("DENIED"); + test_server.set_poll_flight_info_response(Err(e.clone())); + + let response = client.poll_flight_info(request.clone()).await.unwrap_err(); + expect_status(response, e); + }) + .await; +} + // TODO more negative tests (like if there are endpoints defined, etc) #[tokio::test] @@ -852,6 +896,105 @@ async fn test_do_action_error_in_stream() { .await; } +#[tokio::test] +async fn test_cancel_flight_info() { + do_test(|test_server, mut client| async move { + client.add_header("foo-header", "bar-header-value").unwrap(); + + let expected_response = CancelFlightInfoResult::new(CancelStatus::Cancelled); + let response = expected_response.encode_to_vec(); + let response = Ok(arrow_flight::Result::new(response)); + test_server.set_do_action_response(vec![response]); + + let request = CancelFlightInfoRequest::new(FlightInfo::new()); + let actual_response = client + .cancel_flight_info(request.clone()) + .await + .expect("error making request"); + + let expected_request = Action::new("CancelFlightInfo", request.encode_to_vec()); + assert_eq!(actual_response, expected_response); + assert_eq!(test_server.take_do_action_request(), Some(expected_request)); + ensure_metadata(&client, &test_server); + }) + .await; +} + +#[tokio::test] +async fn test_cancel_flight_info_error_no_response() { + do_test(|test_server, mut client| async move { + client.add_header("foo-header", "bar-header-value").unwrap(); + + test_server.set_do_action_response(vec![]); + + let request = CancelFlightInfoRequest::new(FlightInfo::new()); + let err = client + .cancel_flight_info(request.clone()) + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "ProtocolError(\"Received no response for cancel_flight_info call\")" + ); + // server still got the request + let expected_request = Action::new("CancelFlightInfo", request.encode_to_vec()); + assert_eq!(test_server.take_do_action_request(), Some(expected_request)); + ensure_metadata(&client, &test_server); + }) + .await; +} + +#[tokio::test] +async fn test_renew_flight_endpoint() { + do_test(|test_server, mut client| async move { + client.add_header("foo-header", "bar-header-value").unwrap(); + + let expected_response = FlightEndpoint::new().with_app_metadata(vec![1]); + let response = expected_response.encode_to_vec(); + let response = Ok(arrow_flight::Result::new(response)); + test_server.set_do_action_response(vec![response]); + + let request = + RenewFlightEndpointRequest::new(FlightEndpoint::new().with_app_metadata(vec![0])); + let actual_response = client + .renew_flight_endpoint(request.clone()) + .await + .expect("error making request"); + + let expected_request = Action::new("RenewFlightEndpoint", request.encode_to_vec()); + assert_eq!(actual_response, expected_response); + assert_eq!(test_server.take_do_action_request(), Some(expected_request)); + ensure_metadata(&client, &test_server); + }) + .await; +} + +#[tokio::test] +async fn test_renew_flight_endpoint_error_no_response() { + do_test(|test_server, mut client| async move { + client.add_header("foo-header", "bar-header-value").unwrap(); + + test_server.set_do_action_response(vec![]); + + let request = RenewFlightEndpointRequest::new(FlightEndpoint::new()); + let err = client + .renew_flight_endpoint(request.clone()) + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "ProtocolError(\"Received no response for renew_flight_endpoint call\")" + ); + // server still got the request + let expected_request = Action::new("RenewFlightEndpoint", request.encode_to_vec()); + assert_eq!(test_server.take_do_action_request(), Some(expected_request)); + ensure_metadata(&client, &test_server); + }) + .await; +} + async fn test_flight_data() -> Vec { let batch = RecordBatch::try_from_iter(vec![( "col", diff --git a/arrow-flight/tests/common/server.rs b/arrow-flight/tests/common/server.rs index 8b162d398c4b..a75590a13334 100644 --- a/arrow-flight/tests/common/server.rs +++ b/arrow-flight/tests/common/server.rs @@ -26,7 +26,7 @@ use arrow_flight::{ encode::FlightDataEncoderBuilder, flight_service_server::{FlightService, FlightServiceServer}, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, - HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, + HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; #[derive(Debug, Clone)] @@ -54,11 +54,10 @@ impl TestFlightServer { /// Specify the response returned from the next call to handshake pub fn set_handshake_response(&self, response: Result) { let mut state = self.state.lock().expect("mutex not poisoned"); - state.handshake_response.replace(response); } - /// Take and return last handshake request send to the server, + /// Take and return last handshake request sent to the server, pub fn take_handshake_request(&self) -> Option { self.state .lock() @@ -67,14 +66,13 @@ impl TestFlightServer { .take() } - /// Specify the response returned from the next call to handshake + /// Specify the response returned from the next call to get_flight_info pub fn set_get_flight_info_response(&self, response: Result) { let mut state = self.state.lock().expect("mutex not poisoned"); - state.get_flight_info_response.replace(response); } - /// Take and return last get_flight_info request send to the server, + /// Take and return last get_flight_info request sent to the server, pub fn take_get_flight_info_request(&self) -> Option { self.state .lock() @@ -83,6 +81,21 @@ impl TestFlightServer { .take() } + /// Specify the response returned from the next call to poll_flight_info + pub fn set_poll_flight_info_response(&self, response: Result) { + let mut state = self.state.lock().expect("mutex not poisoned"); + state.poll_flight_info_response.replace(response); + } + + /// Take and return last poll_flight_info request sent to the server, + pub fn take_poll_flight_info_request(&self) -> Option { + self.state + .lock() + .expect("mutex not poisoned") + .poll_flight_info_request + .take() + } + /// Specify the response returned from the next call to `do_get` pub fn set_do_get_response(&self, response: Vec>) { let mut state = self.state.lock().expect("mutex not poisoned"); @@ -104,7 +117,7 @@ impl TestFlightServer { state.do_put_response.replace(response); } - /// Take and return last do_put request send to the server, + /// Take and return last do_put request sent to the server, pub fn take_do_put_request(&self) -> Option> { self.state .lock() @@ -214,8 +227,12 @@ struct State { pub handshake_response: Option>, /// The last `get_flight_info` request received pub get_flight_info_request: Option, - /// the next response to return from `get_flight_info` + /// The next response to return from `get_flight_info` pub get_flight_info_response: Option>, + /// The last `poll_flight_info` request received + pub poll_flight_info_request: Option, + /// The next response to return from `poll_flight_info` + pub poll_flight_info_response: Option>, /// The last do_get request received pub do_get_request: Option, /// The next response returned from `do_get` @@ -318,6 +335,20 @@ impl FlightService for TestFlightServer { Ok(Response::new(response)) } + async fn poll_flight_info( + &self, + request: Request, + ) -> Result, Status> { + self.save_metadata(&request); + let mut state = self.state.lock().expect("mutex not poisoned"); + state.poll_flight_info_request = Some(request.into_inner()); + let response = state + .poll_flight_info_response + .take() + .unwrap_or_else(|| Err(Status::internal("No poll_flight_info response configured")))?; + Ok(Response::new(response)) + } + async fn get_schema( &self, request: Request, diff --git a/arrow-integration-testing/src/flight_server_scenarios.rs b/arrow-integration-testing/src/flight_server_scenarios.rs index 9034776c68d4..48d4e6045684 100644 --- a/arrow-integration-testing/src/flight_server_scenarios.rs +++ b/arrow-integration-testing/src/flight_server_scenarios.rs @@ -44,5 +44,7 @@ pub fn endpoint(ticket: &str, location_uri: impl Into) -> FlightEndpoint location: vec![Location { uri: location_uri.into(), }], + expiration_time: None, + app_metadata: vec![].into(), } } diff --git a/arrow-integration-testing/src/flight_server_scenarios/auth_basic_proto.rs b/arrow-integration-testing/src/flight_server_scenarios/auth_basic_proto.rs index ff4fc12f2523..20d868953664 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/auth_basic_proto.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/auth_basic_proto.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow_flight::{ flight_service_server::FlightService, flight_service_server::FlightServiceServer, Action, ActionType, BasicAuth, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, - HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, + HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult, Ticket, }; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; @@ -178,6 +178,14 @@ impl FlightService for AuthBasicProtoScenarioImpl { Err(Status::unimplemented("Not yet implemented")) } + async fn poll_flight_info( + &self, + request: Request, + ) -> Result, Status> { + self.check_auth(request.metadata()).await?; + Err(Status::unimplemented("Not yet implemented")) + } + async fn do_put( &self, request: Request>, diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index 2011031e921a..623a240348f4 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -32,7 +32,7 @@ use arrow_flight::{ flight_descriptor::DescriptorType, flight_service_server::FlightService, flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, IpcMessage, - PutResult, SchemaAsIpc, SchemaResult, Ticket, + PollInfo, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use std::convert::TryInto; @@ -196,6 +196,7 @@ impl FlightService for FlightServiceImpl { total_records: total_records as i64, total_bytes: -1, ordered: false, + app_metadata: vec![].into(), }; Ok(Response::new(info)) @@ -204,6 +205,13 @@ impl FlightService for FlightServiceImpl { } } + async fn poll_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + async fn do_put( &self, request: Request>, diff --git a/arrow-integration-testing/src/flight_server_scenarios/middleware.rs b/arrow-integration-testing/src/flight_server_scenarios/middleware.rs index 68d871b528a6..e8d9c521bb99 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/middleware.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/middleware.rs @@ -20,8 +20,8 @@ use std::pin::Pin; use arrow_flight::{ flight_descriptor::DescriptorType, flight_service_server::FlightService, flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty, FlightData, - FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, - Ticket, + FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PollInfo, PutResult, + SchemaResult, Ticket, }; use futures::Stream; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -120,6 +120,13 @@ impl FlightService for MiddlewareScenarioImpl { Err(status) } + async fn poll_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + async fn do_put( &self, _request: Request>, diff --git a/format/Flight.proto b/format/Flight.proto index 9b44331a5765..653b983ef2c7 100644 --- a/format/Flight.proto +++ b/format/Flight.proto @@ -17,9 +17,10 @@ */ syntax = "proto3"; + import "google/protobuf/timestamp.proto"; option java_package = "org.apache.arrow.flight.impl"; - option go_package = "github.com/apache/arrow/go/arrow/flight/internal/flight"; + option go_package = "github.com/apache/arrow/go/arrow/flight/gen/flight"; option csharp_namespace = "Apache.Arrow.Flight.Protocol"; package arrow.flight.protocol; @@ -64,6 +65,32 @@ */ rpc GetFlightInfo(FlightDescriptor) returns (FlightInfo) {} + /* + * For a given FlightDescriptor, start a query and get information + * to poll its execution status. This is a useful interface if the + * query may be a long-running query. The first PollFlightInfo call + * should return as quickly as possible. (GetFlightInfo doesn't + * return until the query is complete.) + * + * A client can consume any available results before + * the query is completed. See PollInfo.info for details. + * + * A client can poll the updated query status by calling + * PollFlightInfo() with PollInfo.flight_descriptor. A server + * should not respond until the result would be different from last + * time. That way, the client can "long poll" for updates + * without constantly making requests. Clients can set a short timeout + * to avoid blocking calls if desired. + * + * A client can't use PollInfo.flight_descriptor after + * PollInfo.expiration_time passes. A server might not accept the + * retry descriptor anymore and the query may be cancelled. + * + * A client may use the CancelFlightInfo action with + * PollInfo.info to cancel the running query. + */ + rpc PollFlightInfo(FlightDescriptor) returns (PollInfo) {} + /* * For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema * This is used when a consumer needs the Schema of flight stream. Similar to @@ -182,6 +209,24 @@ bytes body = 2; } + /* + * The request of the CancelFlightInfo action. + * + * The request should be stored in Action.body. + */ + message CancelFlightInfoRequest { + FlightInfo info = 1; + } + + /* + * The request of the RenewFlightEndpoint action. + * + * The request should be stored in Action.body. + */ + message RenewFlightEndpointRequest { + FlightEndpoint endpoint = 1; + } + /* * An opaque result returned after executing an action. */ @@ -189,6 +234,36 @@ bytes body = 1; } + /* + * The result of a cancel operation. + * + * This is used by CancelFlightInfoResult.status. + */ + enum CancelStatus { + // The cancellation status is unknown. Servers should avoid using + // this value (send a NOT_FOUND error if the requested query is + // not known). Clients can retry the request. + CANCEL_STATUS_UNSPECIFIED = 0; + // The cancellation request is complete. Subsequent requests with + // the same payload may return CANCELLED or a NOT_FOUND error. + CANCEL_STATUS_CANCELLED = 1; + // The cancellation request is in progress. The client may retry + // the cancellation request. + CANCEL_STATUS_CANCELLING = 2; + // The query is not cancellable. The client should not retry the + // cancellation request. + CANCEL_STATUS_NOT_CANCELLABLE = 3; + } + + /* + * The result of the CancelFlightInfo action. + * + * The result should be stored in Result.body. + */ + message CancelFlightInfoResult { + CancelStatus status = 1; + } + /* * Wrap the result of a getSchema call */ @@ -292,6 +367,61 @@ * FlightEndpoints are in the same order as the data. */ bool ordered = 6; + + /* + * Application-defined metadata. + * + * There is no inherent or required relationship between this + * and the app_metadata fields in the FlightEndpoints or resulting + * FlightData messages. Since this metadata is application-defined, + * a given application could define there to be a relationship, + * but there is none required by the spec. + */ + bytes app_metadata = 7; + } + + /* + * The information to process a long-running query. + */ + message PollInfo { + /* + * The currently available results. + * + * If "flight_descriptor" is not specified, the query is complete + * and "info" specifies all results. Otherwise, "info" contains + * partial query results. + * + * Note that each PollInfo response contains a complete + * FlightInfo (not just the delta between the previous and current + * FlightInfo). + * + * Subsequent PollInfo responses may only append new endpoints to + * info. + * + * Clients can begin fetching results via DoGet(Ticket) with the + * ticket in the info before the query is + * completed. FlightInfo.ordered is also valid. + */ + FlightInfo info = 1; + + /* + * The descriptor the client should use on the next try. + * If unset, the query is complete. + */ + FlightDescriptor flight_descriptor = 2; + + /* + * Query progress. If known, must be in [0.0, 1.0] but need not be + * monotonic or nondecreasing. If unknown, do not set. + */ + optional double progress = 3; + + /* + * Expiration time for this request. After this passes, the server + * might not accept the retry descriptor anymore (and the query may + * be cancelled). This may be updated on a call to PollFlightInfo. + */ + google.protobuf.Timestamp expiration_time = 4; } /* @@ -321,6 +451,24 @@ * represent redundant and/or load balanced services. */ repeated Location location = 2; + + /* + * Expiration time of this stream. If present, clients may assume + * they can retry DoGet requests. Otherwise, it is + * application-defined whether DoGet requests may be retried. + */ + google.protobuf.Timestamp expiration_time = 3; + + /* + * Application-defined metadata. + * + * There is no inherent or required relationship between this + * and the app_metadata fields in the FlightInfo or resulting + * FlightData messages. Since this metadata is application-defined, + * a given application could define there to be a relationship, + * but there is none required by the spec. + */ + bytes app_metadata = 4; } /* @@ -377,4 +525,4 @@ */ message PutResult { bytes app_metadata = 1; - } \ No newline at end of file + } diff --git a/format/FlightSql.proto b/format/FlightSql.proto index 0acf647e1045..f78e77e23278 100644 --- a/format/FlightSql.proto +++ b/format/FlightSql.proto @@ -20,7 +20,7 @@ import "google/protobuf/descriptor.proto"; option java_package = "org.apache.arrow.flight.sql.impl"; - option go_package = "github.com/apache/arrow/go/arrow/flight/internal/flight"; + option go_package = "github.com/apache/arrow/go/arrow/flight/gen/flight"; package arrow.flight.protocol.sql; /* @@ -551,7 +551,7 @@ // Retrieves a int64 value representing the maximum number of characters allowed for a column name. SQL_MAX_COLUMN_NAME_LENGTH = 543; - // Retrieves a int64 value representing the the maximum number of columns allowed in a GROUP BY clause. + // Retrieves a int64 value representing the maximum number of columns allowed in a GROUP BY clause. SQL_MAX_COLUMNS_IN_GROUP_BY = 544; // Retrieves a int64 value representing the maximum number of columns allowed in an index. @@ -943,7 +943,7 @@ /** * The JDBC/ODBC-defined type of any object. - * All the values here are the sames as in the JDBC and ODBC specs. + * All the values here are the same as in the JDBC and ODBC specs. */ enum XdbcDataType { XDBC_UNKNOWN_TYPE = 0; @@ -1023,14 +1023,14 @@ NULLABILITY_NULLABLE = 1; /** - * Indicates that nullability of the fields can not be determined. + * Indicates that nullability of the fields cannot be determined. */ NULLABILITY_UNKNOWN = 2; } enum Searchable { /** - * Indicates that column can not be used in a WHERE clause. + * Indicates that column cannot be used in a WHERE clause. */ SEARCHABLE_NONE = 0; @@ -1196,7 +1196,7 @@ * - ARROW:FLIGHT:SQL:PRECISION - Column precision/size * - ARROW:FLIGHT:SQL:SCALE - Column scale/decimal digits if applicable * - ARROW:FLIGHT:SQL:IS_AUTO_INCREMENT - "1" indicates if the column is auto incremented, "0" otherwise. - * - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case sensitive, "0" otherwise. + * - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case-sensitive, "0" otherwise. * - ARROW:FLIGHT:SQL:IS_READ_ONLY - "1" indicates if the column is read only, "0" otherwise. * - ARROW:FLIGHT:SQL:IS_SEARCHABLE - "1" indicates if the column is searchable via WHERE clause, "0" otherwise. * The returned data should be ordered by catalog_name, db_schema_name, table_name, then table_type, followed by table_schema if requested. @@ -1537,11 +1537,14 @@ bytes prepared_statement_handle = 1; // If a result set generating query was provided, dataset_schema contains the - // schema of the dataset as described in Schema.fbs::Schema, it is serialized as an IPC message. + // schema of the result set. It should be an IPC-encapsulated Schema, as described in Schema.fbs. + // For some queries, the schema of the results may depend on the schema of the parameters. The server + // should provide its best guess as to the schema at this point. Clients must not assume that this + // schema, if provided, will be accurate. bytes dataset_schema = 2; // If the query provided contained parameters, parameter_schema contains the - // schema of the expected parameters as described in Schema.fbs::Schema, it is serialized as an IPC message. + // schema of the expected parameters. It should be an IPC-encapsulated Schema, as described in Schema.fbs. bytes parameter_schema = 3; } @@ -1676,7 +1679,7 @@ * - ARROW:FLIGHT:SQL:PRECISION - Column precision/size * - ARROW:FLIGHT:SQL:SCALE - Column scale/decimal digits if applicable * - ARROW:FLIGHT:SQL:IS_AUTO_INCREMENT - "1" indicates if the column is auto incremented, "0" otherwise. - * - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case sensitive, "0" otherwise. + * - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case-sensitive, "0" otherwise. * - ARROW:FLIGHT:SQL:IS_READ_ONLY - "1" indicates if the column is read only, "0" otherwise. * - ARROW:FLIGHT:SQL:IS_SEARCHABLE - "1" indicates if the column is searchable via WHERE clause, "0" otherwise. * - GetFlightInfo: execute the query. @@ -1702,7 +1705,7 @@ * - ARROW:FLIGHT:SQL:PRECISION - Column precision/size * - ARROW:FLIGHT:SQL:SCALE - Column scale/decimal digits if applicable * - ARROW:FLIGHT:SQL:IS_AUTO_INCREMENT - "1" indicates if the column is auto incremented, "0" otherwise. - * - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case sensitive, "0" otherwise. + * - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case-sensitive, "0" otherwise. * - ARROW:FLIGHT:SQL:IS_READ_ONLY - "1" indicates if the column is read only, "0" otherwise. * - ARROW:FLIGHT:SQL:IS_SEARCHABLE - "1" indicates if the column is searchable via WHERE clause, "0" otherwise. * - GetFlightInfo: execute the query. @@ -1740,9 +1743,12 @@ * - ARROW:FLIGHT:SQL:PRECISION - Column precision/size * - ARROW:FLIGHT:SQL:SCALE - Column scale/decimal digits if applicable * - ARROW:FLIGHT:SQL:IS_AUTO_INCREMENT - "1" indicates if the column is auto incremented, "0" otherwise. - * - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case sensitive, "0" otherwise. + * - ARROW:FLIGHT:SQL:IS_CASE_SENSITIVE - "1" indicates if the column is case-sensitive, "0" otherwise. * - ARROW:FLIGHT:SQL:IS_READ_ONLY - "1" indicates if the column is read only, "0" otherwise. * - ARROW:FLIGHT:SQL:IS_SEARCHABLE - "1" indicates if the column is searchable via WHERE clause, "0" otherwise. + * + * If the schema is retrieved after parameter values have been bound with DoPut, then the server should account + * for the parameters when determining the schema. * - DoPut: bind parameter values. All of the bound parameter sets will be executed as a single atomic execution. * - GetFlightInfo: execute the prepared statement instance. */ @@ -1755,7 +1761,7 @@ /* * Represents a SQL update query. Used in the command member of FlightDescriptor - * for the the RPC call DoPut to cause the server to execute the included SQL update. + * for the RPC call DoPut to cause the server to execute the included SQL update. */ message CommandStatementUpdate { option (experimental) = true; @@ -1768,7 +1774,7 @@ /* * Represents a SQL update query. Used in the command member of FlightDescriptor - * for the the RPC call DoPut to cause the server to execute the included + * for the RPC call DoPut to cause the server to execute the included * prepared statement handle as an update. */ message CommandPreparedStatementUpdate { @@ -1804,8 +1810,12 @@ * data. * * This command is idempotent. + * + * This command is deprecated since 13.0.0. Use the "CancelFlightInfo" + * action with DoAction instead. */ message ActionCancelQueryRequest { + option deprecated = true; option (experimental) = true; // The result of the GetFlightInfo RPC that initiated the query. @@ -1819,8 +1829,12 @@ * The result of cancelling a query. * * The result should be wrapped in a google.protobuf.Any message. + * + * This command is deprecated since 13.0.0. Use the "CancelFlightInfo" + * action with DoAction instead. */ message ActionCancelQueryResult { + option deprecated = true; option (experimental) = true; enum CancelResult { @@ -1844,4 +1858,4 @@ extend google.protobuf.MessageOptions { bool experimental = 1000; - } \ No newline at end of file + }