From bbbc4e136901cfd52d1077c30ddc889baaf93f50 Mon Sep 17 00:00:00 2001 From: Rommel Date: Wed, 14 Apr 2021 10:26:06 -0500 Subject: [PATCH 1/3] Setting csv chunk size default to 256MB --- engine/bsql_engine/io/io.pyx | 3 ++- engine/src/cython/io.cpp | 19 +++++++++++++++++++ engine/src/io/Schema.cpp | 6 +++++- engine/src/io/Schema.h | 4 +++- engine/src/io/data_parser/CSVParser.cpp | 19 ++++++++++++++++--- pyblazing/pyblazing/apiv2/context.py | 4 +++- 6 files changed, 48 insertions(+), 7 deletions(-) diff --git a/engine/bsql_engine/io/io.pyx b/engine/bsql_engine/io/io.pyx index 0c5b2826c..0e23e56e2 100644 --- a/engine/bsql_engine/io/io.pyx +++ b/engine/bsql_engine/io/io.pyx @@ -400,7 +400,8 @@ cpdef parseSchemaCaller(fileList, file_format_hint, args, extra_columns, ignore_ return_object['types'].append((type)) return_object['names'] = tableSchema.names return_object['calcite_to_file_indices']= tableSchema.calcite_to_file_indices - return_object['has_header_csv']= tableSchema.has_header_csv + return_object['has_header_csv'] = tableSchema.has_header_csv + return_object['row_groups_ids'] = tableSchema.row_groups_ids return return_object diff --git a/engine/src/cython/io.cpp b/engine/src/cython/io.cpp index 47e678a9f..445cb1ab4 100644 --- a/engine/src/cython/io.cpp +++ b/engine/src/cython/io.cpp @@ -106,6 +106,25 @@ TableSchema parseSchema(std::vector files, tableSchema.calcite_to_file_indices = schema.get_calcite_to_file_indices(); tableSchema.in_file = schema.get_in_file(); tableSchema.has_header_csv = schema.get_has_header_csv(); + //tableSchema.row_groups_ids = schema.get_rowgroups(); + + if(fileType == ral::io::DataType::CSV) { + // duplicating the csv files + if(!schema.get_rowgroups().empty()){ + std::vector> new_row_groups_ids; + + for (auto row_groups : schema.get_rowgroups()){ + for (auto row_group_id : row_groups){ + std::vector new_vector = {row_group_id}; + new_row_groups_ids.push_back(new_vector); + } + } + + auto size_chunks = schema.get_rowgroups()[0].size(); + tableSchema.files.resize(size_chunks, tableSchema.files[0]); + tableSchema.row_groups_ids = new_row_groups_ids; + } + } return tableSchema; } diff --git a/engine/src/io/Schema.cpp b/engine/src/io/Schema.cpp index 6eabdc733..60a0a4dc2 100644 --- a/engine/src/io/Schema.cpp +++ b/engine/src/io/Schema.cpp @@ -133,7 +133,11 @@ std::vector Schema::get_rowgroup_ids(size_t file_index) const { } } -std::vector> Schema::get_rowgroups(){ +void Schema::set_row_groups_ids(const std::vector> row_groups_ids) { + this->row_groups_ids = row_groups_ids; +} + +std::vector> Schema::get_rowgroups() const { return this->row_groups_ids; } diff --git a/engine/src/io/Schema.h b/engine/src/io/Schema.h index d44df0ef0..3ee3fa0b2 100644 --- a/engine/src/io/Schema.h +++ b/engine/src/io/Schema.h @@ -55,7 +55,7 @@ class Schema { size_t get_num_columns() const; std::vector get_rowgroup_ids(size_t file_index) const; - std::vector> get_rowgroups(); + std::vector> get_rowgroups() const; int get_total_num_rowgroups(); bool get_has_header_csv() const; @@ -70,6 +70,8 @@ class Schema { std::unique_ptr makeEmptyBlazingTable(const std::vector & column_indices) const; + void set_row_groups_ids(const std::vector> row_groups_ids); + inline bool operator==(const Schema & rhs) const { return (this->names == rhs.names) && (this->types == rhs.types); } diff --git a/engine/src/io/data_parser/CSVParser.cpp b/engine/src/io/data_parser/CSVParser.cpp index d41bd44ba..b310541fa 100644 --- a/engine/src/io/data_parser/CSVParser.cpp +++ b/engine/src/io/data_parser/CSVParser.cpp @@ -13,6 +13,8 @@ #include #include "ArgsUtil.h" +#define DEFAULT_CSV_CHUNK_SIZE 268435456 //256MB + #define checkError(error, txt) \ if(error != GDF_SUCCESS) { \ std::cerr << "ERROR: " << error << " in " << txt << std::endl; \ @@ -53,12 +55,16 @@ std::unique_ptr csv_parser::parse_batch( } else args.set_header(-1); - // Overrride `byte_range_offset` and `byte_range_size` + // Override `byte_range_offset` and `byte_range_size` auto iter = args_map.find("max_bytes_chunk_read"); if(iter != args_map.end() && !row_groups.empty()) { auto chunk_size = std::stoll(iter->second); args.set_byte_range_offset(chunk_size * row_groups[0]); args.set_byte_range_size(chunk_size); + }else{ + //const auto row_groups = schema.get_rowgroups(); TODO duplicated values + args.set_byte_range_offset(DEFAULT_CSV_CHUNK_SIZE * (row_groups[0])); + args.set_byte_range_size(DEFAULT_CSV_CHUNK_SIZE); } cudf::io::table_with_metadata csv_table = cudf::io::read_csv(args); @@ -113,15 +119,22 @@ void csv_parser::parse_schema( args.set_nrows(1); args.set_skipfooter(0); } + // here we can infer the file size and set the nrowgroups + int num_row_groups = std::ceil(num_bytes / (double) DEFAULT_CSV_CHUNK_SIZE); + + std::vector row_groups_ids(num_row_groups); + std::iota (std::begin(row_groups_ids), std::end(row_groups_ids), 0); + schema.set_row_groups_ids({row_groups_ids}); + cudf::io::table_with_metadata table_out = cudf::io::read_csv(args); file->Close(); for(int i = 0; i < table_out.tbl->num_columns(); i++) { cudf::type_id type = table_out.tbl->get_column(i).type().id(); - size_t file_index = i; + size_t column_index = i; bool is_in_file = true; std::string name = table_out.metadata.column_names.at(i); - schema.add_column(name, type, file_index, is_in_file); + schema.add_column(name, type, column_index, is_in_file); } } diff --git a/pyblazing/pyblazing/apiv2/context.py b/pyblazing/pyblazing/apiv2/context.py index 4ff77d212..60f2218f5 100644 --- a/pyblazing/pyblazing/apiv2/context.py +++ b/pyblazing/pyblazing/apiv2/context.py @@ -910,7 +910,6 @@ def kwargs_validation(kwargs, bc_api_str): "num_rows", "use_index", "max_bytes_chunk_read", # Used for reading CSV files in chunks - "local_files", "get_metadata", ] params_info = "https://docs.blazingdb.com/docs/create_table" @@ -2482,6 +2481,9 @@ def create_table(self, table_name, input, **kwargs): row_groups_ids.append(row_group_ids) table.row_groups_ids = row_groups_ids + if parsedSchema["file_type"] == DataType.CSV: + table.row_groups_ids = parsedSchema["row_groups_ids"] + elif isinstance(input, dask_cudf.core.DataFrame): table = BlazingTable( table_name, input, DataType.DASK_CUDF, client=self.dask_client From 6b6d2e4270e045340701f419c038a1c773f93c33 Mon Sep 17 00:00:00 2001 From: Rommel Date: Wed, 14 Apr 2021 10:33:41 -0500 Subject: [PATCH 2/3] Edit Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bb7568a4e..6ea052979 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ## Improvements +- #1468 Setting CSV chunk size default to 256MB From 8c1bcbeb579d7e91c5f27bee34e9854017b021df Mon Sep 17 00:00:00 2001 From: Rommel Date: Wed, 14 Apr 2021 12:04:24 -0500 Subject: [PATCH 3/3] Improving the logic according to the issue --- engine/src/io/data_parser/CSVParser.cpp | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/engine/src/io/data_parser/CSVParser.cpp b/engine/src/io/data_parser/CSVParser.cpp index f572053e3..58a118af9 100644 --- a/engine/src/io/data_parser/CSVParser.cpp +++ b/engine/src/io/data_parser/CSVParser.cpp @@ -56,15 +56,19 @@ std::unique_ptr csv_parser::parse_batch( else args.set_header(-1); // Override `byte_range_offset` and `byte_range_size` - auto iter = args_map.find("max_bytes_chunk_read"); - if(iter != args_map.end() && !row_groups.empty()) { - auto chunk_size = std::stoll(iter->second); - args.set_byte_range_offset(chunk_size * row_groups[0]); - args.set_byte_range_size(chunk_size); - }else{ - //const auto row_groups = schema.get_rowgroups(); TODO duplicated values - args.set_byte_range_offset(DEFAULT_CSV_CHUNK_SIZE * (row_groups[0])); - args.set_byte_range_size(DEFAULT_CSV_CHUNK_SIZE); + + if(!row_groups.empty()) { + auto iter = args_map.find("max_bytes_chunk_read"); + + if(iter == args_map.end()){ + //const auto row_groups = schema.get_rowgroups(); TODO duplicated values + args.set_byte_range_offset(DEFAULT_CSV_CHUNK_SIZE * (row_groups[0])); + args.set_byte_range_size(DEFAULT_CSV_CHUNK_SIZE); + }else if(iter->second != "None"){ + auto chunk_size = std::stoll(iter->second); + args.set_byte_range_offset(chunk_size * row_groups[0]); + args.set_byte_range_size(chunk_size); + } } cudf::io::table_with_metadata csv_table = cudf::io::read_csv(args);