Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider support / implementation of Arrow Flight SQL #1323

Closed
2 tasks done
alamb opened this issue Feb 16, 2022 · 13 comments
Closed
2 tasks done

Consider support / implementation of Arrow Flight SQL #1323

alamb opened this issue Feb 16, 2022 · 13 comments
Labels
arrow-flight Changes to the arrow-flight crate enhancement Any new improvement worthy of a entry in the changelog help wanted

Comments

@alamb
Copy link
Contributor

alamb commented Feb 16, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Arrow Flight SQL has just been officially announced https://arrow.apache.org/blog/2022/02/16/introducing-arrow-flight-sql/

It would be very cool if we had a Rust implementation of this protocol to make it easier to build systems in Rust that used this efficient new protocol and would allow better ecosystem integration.

Self servingly, we would likely use it in influxdb_iox and contribute to such an implementation, but it isn't high on our priority list quite yet.

Implementation steps:

Describe the solution you'd like
I propose creating an ArrowFlightSQLService type structure that handles the protocol details, and present an idiomatic rust trait for users.

Something like:

pub trait SqlService {
  /// get  list of database schemas as an array of Strings
  fn schemas(&self, catalog_name: Option<&str>, db_schema_filter_pattern: Option<&str>) -> Result<ArrayRef>;

  /// Execute the query and return an async stream of record batches
  fn query(&self, sql: &str) -> Result<SendableRecordBatchStream>;
 ... 
}

/// Implementation of Arrow Flight SQL for a `SqlService`
struct ArrowFlightSQLService<S: SqlService>  {
  inner: S
}

/// implement service from https://github.com/apache/arrow/blob/release-7.0.0/format/Flight.proto
/// That handles messages in https://github.com/apache/arrow/blob/release-7.0.0/format/FlightSql.proto
impl protobuf::generated::FlightService for ArrowFlightSQLService {

... code to translate to/from protobuf messages

}

Describe alternatives you've considered
One alternative is to make this part of the official arrow-rs release (perhaps with a sql feature in the arrow-flight crate)?

Another alternative is to make this a separate project (e.g. in https://github.com/apache/arrow-datafusion or in https://github.com/datafusion-contrib )

Additional context
DataFusion

@alamb alamb added enhancement Any new improvement worthy of a entry in the changelog arrow-flight Changes to the arrow-flight crate help wanted labels Feb 16, 2022
@andygrove
Copy link
Member

Ideally, the Ballista scheduler should also implement a Flight SQL interface.

@alamb
Copy link
Contributor Author

alamb commented Feb 16, 2022

Ideally, the Ballista scheduler should also implement a Flight SQL interface.

Yeah, I was thinking that if we had this adapter that handled the lower level protocol details, connecting to Ballista would be fairly straightforward

@e-dard
Copy link
Contributor

e-dard commented Feb 16, 2022

I have been working on a (very clunky right now) Rust prototype that speaks the server side PostgreSQL wire protocol, parses queries and then will dispatch them to anything that speaks SQL over flight, e.g., our IOx project.

In terms of the translation between a PG query/command and a backend datasource request, I suspect that I am going to end up doing a lot of the work that an implementation of the Arrow Flight SQL protocol would do.

I would be happy to try and make some progress on this, though it's not quite clear to me yet which bits would live in Arrow or Datafusion yet.

@alamb
Copy link
Contributor Author

alamb commented Feb 16, 2022

Just so that it is clear, Arrow Flight SQL is a replacement for the pg wire protocol (both the command messages and data format).

@nevi-me
Copy link
Contributor

nevi-me commented Feb 16, 2022

@e-dard there was a discussion a while ago about RDBMS query passthrough in Datafusion. I tried it out using @alamb's excellent guide with the TopK node. My approach was to create a custom optimizer that understands the source of LogicalPlan::TableScan's source (e.g. if we've registered a Postgres schema in the catalog).
That way, if a plan includes a join of 2 tables from the same DB, they could be rewritten into a SQL join query. Or when we have something like:

select sum(a), avg(b), max(c), d from postgres.my_table
where <filter expressions>
group by d

For the above query, I've previously been able to convert the logical plan:

|_ Aggregate (sum(a), avg(b), min(c)) by d
   |_ Filter (<filter expr>)
      |_ Project (a, b, c, d)
         |_ TableScan (select * from postgres.my_table)

Into the below by implementing TableScan and converting filter-pushdown predicates to SQL expressions

|_ Aggregate (sum(a), avg(b), min(c)) by d
   |_ TableScan (select a, b, c, d from postgres.my_table where <filter expr>)

With aggregate, join, union etc pushdown missing.

Through trying to implement custom nodes & planners for RDBMS, I've also found that it's quite possible to translate much of the LogicalPlan types back into a SQL query (makes sense as we convert SQL to LogicalPlan), though the code I was generating by hand would get very nested, and could benefit from some further optimization ala Apache Calcite.

I appreciate that I'm very inactive of late, but I'm just mentioning it out of interest, because I also thought of Flight SQL being the standard interface to connect to various DBs when I saw the PR a few weeks ago. The alternative of supporting N interfaces can get daunting. connector-x would help with that to some extent, but I would prefer if DB implementations end up supporting Flight SQL natively.

@e-dard
Copy link
Contributor

e-dard commented Feb 16, 2022

That's super interesting @nevi-me!

I'm primarily interested in making it possible for services/projects that can talk to PG to be able to talk to IOx, where IOx really means Datafusion. My approach has been to implement the PG wire protocol and then handoff a sub-set of supported PG behaviours to a backend over Flight (e.g., queries). Then translate responses (record batches) in PG responses.

I suspect (assume) a lot of the mapping between PG commands and Flight SQL commands are the same, so making progress on a Rust implementation of the flightSQL spec will get a lot of that work done.

Whilst Flight SQL is intended to replace heterogeneous client/server DB protocols, the existing ones aren't going anywhere any time soon. I think Flight SQL will make it easier to map from specific DB protocols to whatever speaks Flight SQL.

@nevi-me
Copy link
Contributor

nevi-me commented Feb 16, 2022

I'm primarily interested in making it possible for services/projects that can talk to PG to be able to talk to IOx, where IOx really means Datafusion. My approach has been to implement the PG wire protocol and then handoff a sub-set of supported PG behaviours to a backend over Flight (e.g., queries). Then translate responses (record batches) in PG responses.

That's a great use-case, supporting the Postgres protocol makes sense as tools (esp BI) that support reading the protocol could treat IOx as a source, similar to CockroachDB and others.


I've dumped my rough drafts of the code I mentioned here: https://github.com/nevi-me/datafusion-rdbms-ext. Nothing of decent quality, but could serve as motivation for someone who has the interest and bandwidth to make DataFusion > RDMBS happen

@GavinRay97
Copy link

@nevi-me That link is super interesting, thank you!

@wangfenjin
Copy link
Contributor

wangfenjin commented Feb 17, 2022

@nevi-me I checked your code, the idea is to have a to_sql() method for LogicalPlan. I think it's a good idea, actually yesterday I also try to read datafusion's codebase to find this kind of method, but got no clue.

I suggest we implement this method in datafusion, at least can covert to sql dialect datafusion itself can understand.

@seddonm1
Copy link
Contributor

I have an implementation of datafusion querying directly against postgres I can share. It too uses the catalog but it allows joining a postgres table to a parquet table to now a parquet from a S3 compatible source. It would be easy enough to define a proper trait so multiple implementations could be done.

I also did the auth and part of a rust postgres protocol server (so pgcli can connect to rust) but ran out of time.

@nevi-me
Copy link
Contributor

nevi-me commented Mar 2, 2022

@seddonm1 does your implementation have support for Postgres' NUMERIC type? I'm struggling with figuring out how to convert that doe DataType::Decimal(p, s), and could do with some help

@alamb
Copy link
Contributor Author

alamb commented Mar 6, 2022

FWIW there is a proposed implementation in #1386

@alamb alamb mentioned this issue Dec 8, 2022
11 tasks
@alamb
Copy link
Contributor Author

alamb commented Dec 8, 2022

I believe we have decided to push forward with FlightSQL in the arrow-flight crate. I am starting to organize our work in #3301 and so I will close this ticket

@alamb alamb closed this as completed Dec 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow-flight Changes to the arrow-flight crate enhancement Any new improvement worthy of a entry in the changelog help wanted
Projects
None yet
Development

No branches or pull requests

7 participants