Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow Flight SQL example JDBC driver incompatibility #5666

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use once_cell::sync::Lazy;
use prost::Message;
use std::collections::HashSet;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use tonic::metadata::MetadataValue;
use tonic::transport::Server;
use tonic::transport::{Certificate, Identity, ServerTlsConfig};
use tonic::{Request, Response, Status, Streaming};
Expand Down Expand Up @@ -106,7 +108,9 @@ static INSTANCE_XBDC_DATA: Lazy<XdbcTypeInfoData> = Lazy::new(|| {
static TABLES: Lazy<Vec<&'static str>> = Lazy::new(|| vec!["flight_sql.example.table"]);

#[derive(Clone)]
pub struct FlightSqlServiceImpl {}
pub struct FlightSqlServiceImpl {
location: String,
}

impl FlightSqlServiceImpl {
fn check_token<T>(&self, req: &Request<T>) -> Result<(), Status> {
Expand Down Expand Up @@ -184,7 +188,15 @@ impl FlightSqlService for FlightSqlServiceImpl {
};
let result = Ok(result);
let output = futures::stream::iter(vec![result]);
return Ok(Response::new(Box::pin(output)));

let token = format!("Bearer {}", FAKE_TOKEN);
let mut response: Response<Pin<Box<dyn Stream<Item = _> + Send>>> =
Response::new(Box::pin(output));
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved
response.metadata_mut().append(
"authorization",
MetadataValue::from_str(token.as_str()).unwrap(),
);
return Ok(response);
}

async fn do_get_fallback(
Expand Down Expand Up @@ -235,12 +247,13 @@ impl FlightSqlService for FlightSqlServiceImpl {
self.check_token(&request)?;
let handle = std::str::from_utf8(&cmd.prepared_statement_handle)
.map_err(|e| status!("Unable to parse handle", e))?;

let batch = Self::fake_result().map_err(|e| status!("Could not fake a result", e))?;
let schema = (*batch.schema()).clone();
let num_rows = batch.num_rows();
let num_bytes = batch.get_array_memory_size();
let loc = Location {
uri: "grpc+tcp://127.0.0.1".to_string(),
uri: self.location.clone(),
};
let fetch = FetchResults {
handle: handle.to_string(),
Expand Down Expand Up @@ -662,9 +675,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
_query: ActionClosePreparedStatementRequest,
_request: Request<Action>,
) -> Result<(), Status> {
Err(Status::unimplemented(
"Implement do_action_close_prepared_statement",
))
Ok(())
}

async fn do_action_create_prepared_substrait_plan(
Expand Down Expand Up @@ -725,9 +736,8 @@ impl FlightSqlService for FlightSqlServiceImpl {
/// This example shows how to run a FlightSql server
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:50051".parse()?;

let svc = FlightServiceServer::new(FlightSqlServiceImpl {});
let addr_str = "127.0.0.1:50051";
istvan-fodor marked this conversation as resolved.
Show resolved Hide resolved
let addr = addr_str.parse()?;

println!("Listening on {:?}", addr);

Expand All @@ -736,6 +746,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let key = std::fs::read_to_string("arrow-flight/examples/data/server.key")?;
let client_ca = std::fs::read_to_string("arrow-flight/examples/data/client_ca.pem")?;

let svc = FlightServiceServer::new(FlightSqlServiceImpl {
location: format!("grpc+tls://{}", addr_str),
});
let tls_config = ServerTlsConfig::new()
.identity(Identity::from_pem(&cert, &key))
.client_ca_root(Certificate::from_pem(&client_ca));
Expand All @@ -746,6 +759,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.serve(addr)
.await?;
} else {
let svc = FlightServiceServer::new(FlightSqlServiceImpl {
location: format!("grpc+tcp://{}", addr_str),
});

Server::builder().add_service(svc).serve(addr).await?;
}

Expand Down Expand Up @@ -828,7 +845,9 @@ mod tests {
let uds = UnixListener::bind(path.clone()).unwrap();
let stream = UnixListenerStream::new(uds);

let service = FlightSqlServiceImpl {};
let service = FlightSqlServiceImpl {
location: format!("grpc+unix://{}", path.clone()),
};
let serve_future = Server::builder()
.add_service(FlightServiceServer::new(service))
.serve_with_incoming(stream);
Expand Down Expand Up @@ -858,7 +877,9 @@ mod tests {
let (incoming, addr) = bind_tcp().await;
let uri = format!("http://{}:{}", addr.ip(), addr.port());

let service = FlightSqlServiceImpl {};
let service = FlightSqlServiceImpl {
location: format!("grpc+tcp://{}:{}", addr.ip(), addr.port()),
};
let serve_future = Server::builder()
.add_service(FlightServiceServer::new(service))
.serve_with_incoming(incoming);
Expand Down Expand Up @@ -892,7 +913,9 @@ mod tests {
let (incoming, addr) = bind_tcp().await;
let uri = format!("https://{}:{}", addr.ip(), addr.port());

let svc = FlightServiceServer::new(FlightSqlServiceImpl {});
let svc = FlightServiceServer::new(FlightSqlServiceImpl {
location: format!("grc+tls://{}:{}", addr.ip(), addr.port()),
});

let serve_future = Server::builder()
.tls_config(tls_config)
Expand Down Expand Up @@ -999,15 +1022,6 @@ mod tests {
.to_string()
.contains("Invalid credentials"));

// forget to set_token
client.handshake("admin", "password").await.unwrap();
assert!(client
.prepare("select 1;".to_string(), None)
.await
.unwrap_err()
.to_string()
.contains("No authorization header"));
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved

// Invalid Tokens
client.handshake("admin", "password").await.unwrap();
client.set_token("wrong token".to_string());
Expand All @@ -1017,6 +1031,12 @@ mod tests {
.unwrap_err()
.to_string()
.contains("invalid token"));

client.clear_token();

// Successful call (token is automatically set by handshake)
client.handshake("admin", "password").await.unwrap();
client.prepare("select 1;".to_string(), None).await.unwrap();
})
.await
}
Expand Down
5 changes: 5 additions & 0 deletions arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ impl FlightSqlServiceClient<Channel> {
self.token = Some(token);
}

/// Clear the auth token.
pub fn clear_token(&mut self) {
self.token = None;
}

/// Set header value.
pub fn set_header(&mut self, key: impl Into<String>, value: impl Into<String>) {
let key: String = key.into();
Expand Down
Loading