Skip to content

Commit

Permalink
feat(flight): add helpers to handle CommandGetCatalogs, `CommandGet…
Browse files Browse the repository at this point in the history
…Schemas`, and `CommandGetTables` requests (#4296)

* feat: add get catalog helpers

* fix: clippy in arrow_row example

* feat: add db schemas helpers

* chore: cleanup  db schemas helpers

* feat: add table schema hlpers

* test: add tests and docs

* docs: add table queries to example server

* docs: improve builder docs

* fix: docs links

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* Improve docs and tests for `SqlInfoList (#4293)

* Improve docs and tests for SqlInfoList

* Add an example/

* Update arrow-flight/src/sql/sql_info.rs

Co-authored-by: Liang-Chi Hsieh <[email protected]>

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* fix: use FlightInfo builders

* chore: clippy

* fmt

* feat: add filters to GetTablesBuilder

* fix: clippy

* feat: more consistent builder apis

* chore: cleanup

---------

Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
  • Loading branch information
3 people authored Jun 1, 2023
1 parent a9c5c97 commit dde6539
Show file tree
Hide file tree
Showing 7 changed files with 1,001 additions and 24 deletions.
9 changes: 7 additions & 2 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ repository = { workspace = true }
license = { workspace = true }

[dependencies]
arrow-arith = { workspace = true, optional = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
# Cast is needed to work around https://github.com/apache/arrow-rs/issues/3389
arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-data = { workspace = true, optional = true }
arrow-ipc = { workspace = true }
arrow-ord = { workspace = true, optional = true }
arrow-row = { workspace = true, optional = true }
arrow-select = { workspace = true, optional = true }
arrow-schema = { workspace = true }
arrow-string = { workspace = true, optional = true }
base64 = { version = "0.21", default-features = false, features = ["std"] }
bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand All @@ -53,7 +58,7 @@ all-features = true

[features]
default = []
flight-sql-experimental = ["once_cell"]
flight-sql-experimental = ["arrow-arith", "arrow-data", "arrow-ord", "arrow-row", "arrow-select", "arrow-string", "once_cell"]
tls = ["tonic/tls"]

# Enable CLI tools
Expand Down
140 changes: 119 additions & 21 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use base64::Engine;
use futures::{stream, Stream, TryStreamExt};
use once_cell::sync::Lazy;
use prost::Message;
use std::collections::HashSet;
use std::pin::Pin;
use std::sync::Arc;
use tonic::transport::Server;
Expand All @@ -29,6 +30,9 @@ use tonic::{Request, Response, Status, Streaming};
use arrow_array::builder::StringBuilder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::sql::catalogs::{
get_catalogs_schema, get_db_schemas_schema, get_tables_schema,
};
use arrow_flight::sql::sql_info::SqlInfoList;
use arrow_flight::sql::{
server::FlightSqlService, ActionBeginSavepointRequest, ActionBeginSavepointResult,
Expand Down Expand Up @@ -72,6 +76,8 @@ static INSTANCE_SQL_INFO: Lazy<SqlInfoList> = Lazy::new(|| {
.with_sql_info(SqlInfo::FlightSqlServerArrowVersion, "1.3")
});

static TABLES: Lazy<Vec<&'static str>> = Lazy::new(|| vec!["flight_sql.example.table"]);

#[derive(Clone)]
pub struct FlightSqlServiceImpl {}

Expand Down Expand Up @@ -236,32 +242,62 @@ impl FlightSqlService for FlightSqlServiceImpl {

async fn get_flight_info_catalogs(
&self,
_query: CommandGetCatalogs,
_request: Request<FlightDescriptor>,
query: CommandGetCatalogs,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented(
"get_flight_info_catalogs not implemented",
))
let flight_descriptor = request.into_inner();
let ticket = Ticket {
ticket: query.encode_to_vec().into(),
};
let endpoint = FlightEndpoint::new().with_ticket(ticket);

let flight_info = FlightInfo::new()
.try_with_schema(get_catalogs_schema())
.map_err(|e| status!("Unable to encode schema", e))?
.with_endpoint(endpoint)
.with_descriptor(flight_descriptor);

Ok(tonic::Response::new(flight_info))
}

async fn get_flight_info_schemas(
&self,
_query: CommandGetDbSchemas,
_request: Request<FlightDescriptor>,
query: CommandGetDbSchemas,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented(
"get_flight_info_schemas not implemented",
))
let flight_descriptor = request.into_inner();
let ticket = Ticket {
ticket: query.encode_to_vec().into(),
};
let endpoint = FlightEndpoint::new().with_ticket(ticket);

let flight_info = FlightInfo::new()
.try_with_schema(get_db_schemas_schema().as_ref())
.map_err(|e| status!("Unable to encode schema", e))?
.with_endpoint(endpoint)
.with_descriptor(flight_descriptor);

Ok(tonic::Response::new(flight_info))
}

async fn get_flight_info_tables(
&self,
_query: CommandGetTables,
_request: Request<FlightDescriptor>,
query: CommandGetTables,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented(
"get_flight_info_tables not implemented",
))
let flight_descriptor = request.into_inner();
let ticket = Ticket {
ticket: query.encode_to_vec().into(),
};
let endpoint = FlightEndpoint::new().with_ticket(ticket);

let flight_info = FlightInfo::new()
.try_with_schema(get_tables_schema(query.include_schema).as_ref())
.map_err(|e| status!("Unable to encode schema", e))?
.with_endpoint(endpoint)
.with_descriptor(flight_descriptor);

Ok(tonic::Response::new(flight_info))
}

async fn get_flight_info_table_types(
Expand Down Expand Up @@ -363,26 +399,88 @@ impl FlightSqlService for FlightSqlServiceImpl {

async fn do_get_catalogs(
&self,
_query: CommandGetCatalogs,
query: CommandGetCatalogs,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
Err(Status::unimplemented("do_get_catalogs not implemented"))
let catalog_names = TABLES
.iter()
.map(|full_name| full_name.split('.').collect::<Vec<_>>()[0].to_string())
.collect::<HashSet<_>>();
let mut builder = query.into_builder();
for catalog_name in catalog_names {
builder.append(catalog_name);
}
let batch = builder.build();
let stream = FlightDataEncoderBuilder::new()
.with_schema(Arc::new(get_catalogs_schema().clone()))
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
Ok(Response::new(Box::pin(stream)))
}

async fn do_get_schemas(
&self,
_query: CommandGetDbSchemas,
query: CommandGetDbSchemas,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
Err(Status::unimplemented("do_get_schemas not implemented"))
let schemas = TABLES
.iter()
.map(|full_name| {
let parts = full_name.split('.').collect::<Vec<_>>();
(parts[0].to_string(), parts[1].to_string())
})
.collect::<HashSet<_>>();

let mut builder = query.into_builder();
for (catalog_name, schema_name) in schemas {
builder.append(catalog_name, schema_name);
}

let batch = builder.build();
let stream = FlightDataEncoderBuilder::new()
.with_schema(get_db_schemas_schema())
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
Ok(Response::new(Box::pin(stream)))
}

async fn do_get_tables(
&self,
_query: CommandGetTables,
query: CommandGetTables,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
Err(Status::unimplemented("do_get_tables not implemented"))
let tables = TABLES
.iter()
.map(|full_name| {
let parts = full_name.split('.').collect::<Vec<_>>();
(
parts[0].to_string(),
parts[1].to_string(),
parts[2].to_string(),
)
})
.collect::<HashSet<_>>();

let dummy_schema = Schema::empty();
let mut builder = query.into_builder();
for (catalog_name, schema_name, table_name) in tables {
builder
.append(
catalog_name,
schema_name,
table_name,
"TABLE",
&dummy_schema,
)
.map_err(Status::from)?;
}

let batch = builder.build();
let stream = FlightDataEncoderBuilder::new()
.with_schema(get_db_schemas_schema())
.build(futures::stream::once(async { batch }))
.map_err(Status::from);
Ok(Response::new(Box::pin(stream)))
}

async fn do_get_table_types(
Expand Down
Loading

0 comments on commit dde6539

Please sign in to comment.