Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] fix/postgresql-provider-issues #1492

Open
wants to merge 60 commits into
base: branch-21.06
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
a4e87b0
[feature/table-from-postgresl-sqlite] create string container for sql…
gcca Apr 9, 2021
4c22705
[feature/table-from-postgresl-sqlite] add string parse for sqlite
gcca Apr 9, 2021
2da0476
[feature/table-from-postgresl-sqlite] parse standard string column name
gcca Apr 9, 2021
b60d762
[feature/table-from-postgresl-sqlite] fix bad column order when schem…
gcca Apr 11, 2021
b2945e5
[feature/table-from-postgresl-sqlite] update sqlite parser to new api
gcca Apr 12, 2021
94c276c
[feature/table-from-postgresl-sqlite] update sqlite parser api for ints
gcca Apr 12, 2021
7eae9de
[feature/table-from-postgresl-sqlite] update sqlite parser api
gcca Apr 12, 2021
9368820
[feature/table-from-postgresl-sqlite] read row for sqlite parser
gcca Apr 12, 2021
c746334
[feature/table-from-postgresl-sqlite] typo
gcca Apr 12, 2021
f002fb2
[feature/table-from-postgresl-sqlite] sql engine validation to parse …
gcca Apr 12, 2021
a34c084
[feature/table-from-postgresl-sqlite] update postgresql parser to new…
gcca Apr 13, 2021
77759bc
[feature/table-from-postgresl-sqlite] remove context name
gcca Apr 13, 2021
9f51021
[feature/table-from-postgresl-sqlite] add sql loop parsing
gcca Apr 13, 2021
9680a3e
[feature/table-from-postgresl-sqlite] update readme with mysql and po…
gcca Apr 13, 2021
9116fea
[feature/table-from-postgresl-sqlite] add parse types for postregsql
gcca Apr 13, 2021
a3efb41
[feature/table-from-postgresl-sqlite] add postgresql support when par…
gcca Apr 13, 2021
511ae58
[feature/table-from-postgresl-sqlite] add postgresql to table nodes
gcca Apr 13, 2021
73af7bb
[feature/table-from-postgresl-sqlite] postgresql read sql loop
gcca Apr 13, 2021
b031ab1
[feature/table-from-postgresl-sqlite] add condition for postgresql an…
gcca Apr 13, 2021
1569705
Merge remote-tracking branch 'upstream/branch-0.20' into feature/tabl…
gcca Apr 13, 2021
9d9fa51
[feature/table-from-postgresl-sqlite] update readme to install postgr…
gcca Apr 13, 2021
345a981
[feature/table-from-postgresl-sqlite] update readme to install postgr…
gcca Apr 13, 2021
64d8166
add basic predicate push down implementation for sql providers
aucahuasi Apr 14, 2021
935a921
Merge remote-tracking branch 'upstream/branch-0.19' into feature/pred…
aucahuasi Apr 14, 2021
95d3ba6
[feature/table-from-postgresl-sqlite] add support for postgresql date…
gcca Apr 14, 2021
c3e7fe3
[feature/table-from-postgresl-sqlite] add support for postgresql time…
gcca Apr 14, 2021
b7f3049
[feature/table-from-postgresl-sqlite] update column date timestamp fo…
gcca Apr 14, 2021
4772c83
Improve the sql transpiler implementation and avoid filter in the ker…
aucahuasi Apr 14, 2021
3c77fa5
improve the sql transpiler framework
aucahuasi Apr 15, 2021
38935b7
fixes for the sql transpiler (unary ops and predicate generation)
aucahuasi Apr 15, 2021
f542049
fix sql transpiler generation step for n-ary operators
aucahuasi Apr 15, 2021
2572627
[feature/table-from-postgresl-sqlite] dont ignore another text fields…
gcca Apr 15, 2021
469897e
fixes and load pg data for e2e
aucahuasi Apr 15, 2021
dc5485f
Merge branch 'feature/table-from-postgresl-sqlite' of https://github.…
aucahuasi Apr 15, 2021
7342813
[feature/table-from-postgresl-sqlite] add sqlite e2e support
gcca Apr 15, 2021
fe5c401
[feature/table-from-postgresl-sqlite] update readme for sql environme…
gcca Apr 15, 2021
ce66da2
Merge branch 'feature/table-from-postgresl-sqlite' of github.com:gcca…
gcca Apr 15, 2021
e1dbb9d
[feature/table-from-postgresl-sqlite] add char to hint sqlite types
gcca Apr 15, 2021
5f5a7a4
Merge remote-tracking branch 'cris/feature/table-from-postgresl-sqlit…
aucahuasi Apr 16, 2021
d3fc4bc
Merge remote-tracking branch 'upstream/branch-0.20' into feature/pred…
aucahuasi Apr 16, 2021
a665223
Merge remote-tracking branch 'upstream/branch-0.20' into feature/pred…
aucahuasi Apr 16, 2021
bbee3ec
Merge branch 'feature/predicate-pushdown-providers' of https://github…
aucahuasi Apr 16, 2021
f4e5264
improve predicate pushdow internal api
aucahuasi Apr 16, 2021
bddd445
[feature/table-from-postgresl-sqlite] update parser type names
gcca Apr 16, 2021
d9a3d9c
[feature/table-from-postgresl-sqlite] update provider construction
gcca Apr 16, 2021
5270ab2
[feature/table-from-postgresl-sqlite] update sqlite provider to check…
gcca Apr 16, 2021
86deba7
clean unnecesary code
aucahuasi Apr 16, 2021
8669c4d
chgl
aucahuasi Apr 16, 2021
96271cf
dlb t
aucahuasi Apr 16, 2021
243a586
[feature/table-from-postgresl-sqlite] update sqlite provider for dist…
gcca Apr 16, 2021
aeec007
improve e2e! add generic template for test runner
aucahuasi Apr 17, 2021
da42dce
[feature/table-from-postgresl-sqlite] update sqlite provider to load …
gcca Apr 17, 2021
8239b91
[feature/table-from-postgresl-sqlite] update postgresql provider for …
gcca Apr 19, 2021
7a9d717
Merge branch 'feature/predicate-pushdown-providers' of github.com:auc…
gcca Apr 19, 2021
928bf53
[feature/table-from-postgresl-sqlite] update postgresql parser for nu…
gcca Apr 19, 2021
c51b670
[feature/table-from-postgresl-sqlite] update postgresql to manage cha…
gcca Apr 20, 2021
1a0ae28
[feature/table-from-postgresl-sqlite] add order by argumento to selec…
gcca Apr 21, 2021
2956ea2
[feature/table-from-postgresl-sqlite] check has_next using count(*) t…
gcca Apr 21, 2021
7267c9c
[feature/table-from-postgresl-sqlite] update sqlite provider to consi…
gcca Apr 21, 2021
6f1c5fc
[feature/table-from-postgresl-sqlite] remove some tries to fix postgr…
gcca Apr 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- #1396 Create tables from other RDBMS
- #1427 Support for CONCAT alias operator
- #1424 Add get physical plan with explain
- #1472 Implement predicate pushdown for data providers

## Improvements
- #1325 Refactored CacheMachine.h and CacheMachine.cpp
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ bc.sql('SELECT passenger_count, trip_distance FROM taxi LIMIT 2')
## Documentation
You can find our full documentation at [docs.blazingdb.com](https://docs.blazingdb.com/docs).

# Prerequisites
# Prerequisites
* [Anaconda or Miniconda](https://docs.conda.io/projects/conda/en/latest/user-guide/install/linux.html) installed
* OS Support
* Ubuntu 16.04/18.04 LTS
Expand All @@ -96,7 +96,7 @@ Where $CUDA_VERSION is 10.1, 10.2 or 11.0 and $PYTHON_VERSION is 3.7 or 3.8
*For example for CUDA 10.1 and Python 3.7:*
```bash
conda install -c blazingsql -c rapidsai -c nvidia -c conda-forge -c defaults blazingsql python=3.7 cudatoolkit=10.1
```
```

## Nightly Version
```bash
Expand Down
20 changes: 18 additions & 2 deletions engine/src/cython/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
#include "../io/data_provider/sql/MySQLDataProvider.h"
#endif

#ifdef POSTGRESQL_SUPPORT
#include "../io/data_parser/sql/PostgreSQLParser.h"
#include "../io/data_provider/sql/PostgreSQLDataProvider.h"
#endif

#ifdef SQLITE_SUPPORT
#include "../io/data_parser/sql/SQLiteParser.h"
#include "../io/data_provider/sql/SQLiteDataProvider.h"
Expand Down Expand Up @@ -88,12 +93,23 @@ std::pair<std::vector<ral::io::data_loader>, std::vector<ral::io::Schema>> get_l
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support MySQL integration");
#endif
} else if(fileType == ral::io::DataType::SQLITE) {
} else if(fileType == ral::io::DataType::POSTGRESQL) {
#ifdef POSTGRESQL_SUPPORT
parser = std::make_shared<ral::io::postgresql_parser>();
auto sql = ral::io::getSqlInfo(args_map);
provider = std::make_shared<ral::io::postgresql_data_provider>(sql, total_number_of_nodes, self_node_idx);
isSqlProvider = true;
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support PostgreSQL integration");
#endif
} else if(fileType == ral::io::DataType::SQLITE) {
#ifdef SQLITE_SUPPORT
parser = std::make_shared<ral::io::sqlite_parser>();
auto sql = ral::io::getSqlInfo(args_map);
provider = std::make_shared<ral::io::sqlite_data_provider>(sql, total_number_of_nodes, self_node_idx);
isSqlProvider = true;
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support SQLite integration");
#endif
}

Expand Down Expand Up @@ -184,7 +200,7 @@ std::shared_ptr<ral::cache::graph> runGenerateGraph(uint32_t masterIndex,
{
using blazingdb::manager::Context;
using blazingdb::transport::Node;

auto& communicationData = ral::communication::CommunicationData::getInstance();

std::vector<Node> contextNodes;
Expand Down
18 changes: 18 additions & 0 deletions engine/src/cython/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
#include "../io/data_provider/sql/MySQLDataProvider.h"
#endif

#ifdef POSTGRESQL_SUPPORT
#include "../io/data_parser/sql/PostgreSQLParser.h"
#include "../io/data_provider/sql/PostgreSQLDataProvider.h"
#endif

#ifdef SQLITE_SUPPORT
#include "../io/data_parser/sql/SQLiteParser.h"
#include "../io/data_provider/sql/SQLiteDataProvider.h"
Expand Down Expand Up @@ -71,6 +76,17 @@ TableSchema parseSchema(std::vector<std::string> files,
parser = std::make_shared<ral::io::mysql_parser>();
auto sql = ral::io::getSqlInfo(args_map);
provider = std::make_shared<ral::io::mysql_data_provider>(sql, 0, 0);
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support MySQL integration");
#endif
isSqlProvider = true;
} else if(fileType == ral::io::DataType::POSTGRESQL) {
#ifdef POSTGRESQL_SUPPORT
parser = std::make_shared<ral::io::postgresql_parser>();
auto sql = ral::io::getSqlInfo(args_map);
provider = std::make_shared<ral::io::postgresql_data_provider>(sql, 0, 0);
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support PostgreSQL integration");
#endif
isSqlProvider = true;
} else if(fileType == ral::io::DataType::SQLITE) {
Expand All @@ -79,6 +95,8 @@ TableSchema parseSchema(std::vector<std::string> files,
auto sql = ral::io::getSqlInfo(args_map);
provider = std::make_shared<ral::io::sqlite_data_provider>(sql, 0, 0);
isSqlProvider = true;
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support SQLite integration");
#endif
}

Expand Down
37 changes: 34 additions & 3 deletions engine/src/execution_graph/logic_controllers/BatchProcessing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
#include "io/data_provider/sql/MySQLDataProvider.h"
#endif

// TODO percy
//#include "io/data_parser/sql/PostgreSQLParser.h"
#ifdef POSTGRESQL_SUPPORT
#include "io/data_provider/sql/PostgreSQLDataProvider.h"
#endif

#ifdef SQLITE_SUPPORT
#include "io/data_provider/sql/SQLiteDataProvider.h"
#endif

#include "parser/expression_utils.hpp"
#include "taskflow/executor.h"
Expand Down Expand Up @@ -128,6 +133,18 @@ TableScan::TableScan(std::size_t kernel_id, const std::string & queryString, std
ral::io::set_sql_projections<ral::io::mysql_data_provider>(provider.get(), get_projections_wrapper(schema.get_num_columns()));
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support MySQL integration");
#endif
} else if (parser->type() == ral::io::DataType::POSTGRESQL) {
#ifdef POSTGRESQL_SUPPORT
ral::io::set_sql_projections<ral::io::postgresql_data_provider>(provider.get(), get_projections_wrapper(schema.get_num_columns()));
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support PostgreSQL integration");
#endif
} else if (parser->type() == ral::io::DataType::SQLITE) {
#ifdef SQLITE_SUPPORT
ral::io::set_sql_projections<ral::io::sqlite_data_provider>(provider.get(), get_projections_wrapper(schema.get_num_columns()));
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support SQLite integration");
#endif
} else {
num_batches = provider->get_num_handles();
Expand Down Expand Up @@ -237,6 +254,7 @@ BindableTableScan::BindableTableScan(std::size_t kernel_id, const std::string &
: kernel(kernel_id, queryString, context, kernel_type::BindableTableScanKernel), provider(provider), parser(parser), schema(schema) {
this->query_graph = query_graph;
this->filtered = is_filtered_bindable_scan(expression);
this->predicate_pushdown_done = false;

if(parser->type() == ral::io::DataType::CUDF || parser->type() == ral::io::DataType::DASK_CUDF){
num_batches = std::max(provider->get_num_handles(), (size_t)1);
Expand All @@ -263,8 +281,21 @@ BindableTableScan::BindableTableScan(std::size_t kernel_id, const std::string &
} else if (parser->type() == ral::io::DataType::MYSQL) {
#ifdef MYSQL_SUPPORT
ral::io::set_sql_projections<ral::io::mysql_data_provider>(provider.get(), get_projections_wrapper(schema.get_num_columns(), queryString));
predicate_pushdown_done = ral::io::set_sql_predicate_pushdown<ral::io::mysql_data_provider>(provider.get(), queryString);
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support MySQL integration");
#endif
} else if (parser->type() == ral::io::DataType::POSTGRESQL) {
#ifdef POSTGRESQL_SUPPORT
ral::io::set_sql_projections<ral::io::postgresql_data_provider>(provider.get(), get_projections_wrapper(schema.get_num_columns(), queryString));
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support PostgreSQL integration");
#endif
} else if (parser->type() == ral::io::DataType::SQLITE) {
#ifdef SQLITE_SUPPORT
ral::io::set_sql_projections<ral::io::sqlite_data_provider>(provider.get(), get_projections_wrapper(schema.get_num_columns(), queryString));
#else
throw std::runtime_error("ERROR: This BlazingSQL version doesn't support SQLite integration");
#endif
} else {
num_batches = provider->get_num_handles();
Expand All @@ -278,7 +309,7 @@ ral::execution::task_result BindableTableScan::do_process(std::vector< std::uniq
std::unique_ptr<ral::frame::BlazingTable> filtered_input;

try{
if(this->filtered) {
if(this->filtered && !this->predicate_pushdown_done) {
filtered_input = ral::processor::process_filter(input->toBlazingTableView(), expression, this->context.get());
filtered_input->setNames(fix_column_aliases(filtered_input->names(), expression));
output->addToCache(std::move(filtered_input));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class BindableTableScan : public kernel {
size_t file_index = 0;
size_t num_batches;
bool filtered;
bool predicate_pushdown_done;
};

/**
Expand Down
4 changes: 2 additions & 2 deletions engine/src/io/data_parser/ArgsUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ sql_info getSqlInfo(std::map<std::string, std::string> &args_map) {
if (args_map.find("password") != args_map.end()) {
sql.password = args_map.at("password");
}
if (args_map.find("schema") != args_map.end()) {
sql.schema = args_map.at("schema");
if (args_map.find("database") != args_map.end()) {
sql.schema = args_map.at("database");
}
if (args_map.find("table") != args_map.end()) {
sql.table = args_map.at("table");
Expand Down
34 changes: 27 additions & 7 deletions engine/src/io/data_parser/sql/AbstractSQLParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,35 @@ std::unique_ptr<ral::frame::BlazingTable> abstractsql_parser::parse_batch(
{
void *src = nullptr;

if (type() == DataType::MYSQL) {
#if defined(MYSQL_SUPPORT)
src = handle.sql_handle.mysql_resultset.get();
#elif defined(POSTGRESQL_SUPPORT)
src = handle.sql_handle.postgresql_result.get();
#elif defined(SQLITE_SUPPORT)
src = handle.sql_handle.sqlite_statement.get();
src = handle.sql_handle.mysql_resultset.get();
#else
throw std::runtime_error(
"Unsupported MySQL parser for this BlazingSQL version");
#endif
}

if (type() == DataType::POSTGRESQL) {
#if defined(POSTGRESQL_SUPPORT)
src = handle.sql_handle.postgresql_result.get();
#else
throw std::runtime_error(
"Unsupported PostgreSQL parser for this BlazingSQL version");
#endif
}

return this->parse_raw_batch(src, schema, column_indices, row_groups, handle.sql_handle.row_count);
if (type() == DataType::SQLITE) {
#if defined(SQLITE_SUPPORT)
src = handle.sql_handle.sqlite_statement.get();
#else
throw std::runtime_error(
"Unsupported Sqlite3 parser for this BlazingSQL version");
#endif
}

return this->parse_raw_batch(
src, schema, column_indices, row_groups, handle.sql_handle.row_count);
}

void abstractsql_parser::parse_schema(ral::io::data_handle handle, ral::io::Schema & schema) {
Expand Down Expand Up @@ -277,7 +297,7 @@ std::pair<std::vector<void*>, std::vector<std::vector<cudf::bitmask_type>>> init
case cudf::type_id::NUM_TYPE_IDS: {} break;
}
}

return std::make_pair(host_cols, null_masks);
}

Expand Down
3 changes: 1 addition & 2 deletions engine/src/io/data_parser/sql/AbstractSQLParser.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* Copyright 2021 BlazingDB, Inc.
* Copyright 2021 Percy Camilo Triveño Aucahuasi <[email protected]>
* Copyright 2021 Percy Camilo Triveño Aucahuasi <[email protected]>
*/

#ifndef _ABSTRACTSQLPARSER_H_
Expand Down
Loading