diff --git a/CHANGELOG.md b/CHANGELOG.md index bb7568a4e..6ea052979 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ## Improvements +- #1468 Setting CSV chunk size default to 256MB diff --git a/engine/bsql_engine/io/io.pyx b/engine/bsql_engine/io/io.pyx index 3bbcaea6c..3ed59de7e 100644 --- a/engine/bsql_engine/io/io.pyx +++ b/engine/bsql_engine/io/io.pyx @@ -400,7 +400,8 @@ cpdef parseSchemaCaller(fileList, file_format_hint, args, extra_columns, ignore_ return_object['types'].append((type)) return_object['names'] = tableSchema.names return_object['calcite_to_file_indices']= tableSchema.calcite_to_file_indices - return_object['has_header_csv']= tableSchema.has_header_csv + return_object['has_header_csv'] = tableSchema.has_header_csv + return_object['row_groups_ids'] = tableSchema.row_groups_ids return return_object diff --git a/engine/src/cython/io.cpp b/engine/src/cython/io.cpp index 43c42c2c3..da9613499 100644 --- a/engine/src/cython/io.cpp +++ b/engine/src/cython/io.cpp @@ -147,6 +147,25 @@ TableSchema parseSchema(std::vector files, tableSchema.calcite_to_file_indices = schema.get_calcite_to_file_indices(); tableSchema.in_file = schema.get_in_file(); tableSchema.has_header_csv = schema.get_has_header_csv(); + //tableSchema.row_groups_ids = schema.get_rowgroups(); + + if(fileType == ral::io::DataType::CSV) { + // duplicating the csv files + if(!schema.get_rowgroups().empty()){ + std::vector> new_row_groups_ids; + + for (auto row_groups : schema.get_rowgroups()){ + for (auto row_group_id : row_groups){ + std::vector new_vector = {row_group_id}; + new_row_groups_ids.push_back(new_vector); + } + } + + auto size_chunks = schema.get_rowgroups()[0].size(); + tableSchema.files.resize(size_chunks, tableSchema.files[0]); + tableSchema.row_groups_ids = new_row_groups_ids; + } + } return tableSchema; } diff --git a/engine/src/io/Schema.cpp b/engine/src/io/Schema.cpp index 6eabdc733..60a0a4dc2 100644 --- a/engine/src/io/Schema.cpp +++ b/engine/src/io/Schema.cpp @@ -133,7 +133,11 @@ std::vector Schema::get_rowgroup_ids(size_t file_index) const { } } -std::vector> Schema::get_rowgroups(){ +void Schema::set_row_groups_ids(const std::vector> row_groups_ids) { + this->row_groups_ids = row_groups_ids; +} + +std::vector> Schema::get_rowgroups() const { return this->row_groups_ids; } diff --git a/engine/src/io/Schema.h b/engine/src/io/Schema.h index d44df0ef0..3ee3fa0b2 100644 --- a/engine/src/io/Schema.h +++ b/engine/src/io/Schema.h @@ -55,7 +55,7 @@ class Schema { size_t get_num_columns() const; std::vector get_rowgroup_ids(size_t file_index) const; - std::vector> get_rowgroups(); + std::vector> get_rowgroups() const; int get_total_num_rowgroups(); bool get_has_header_csv() const; @@ -70,6 +70,8 @@ class Schema { std::unique_ptr makeEmptyBlazingTable(const std::vector & column_indices) const; + void set_row_groups_ids(const std::vector> row_groups_ids); + inline bool operator==(const Schema & rhs) const { return (this->names == rhs.names) && (this->types == rhs.types); } diff --git a/engine/src/io/data_parser/CSVParser.cpp b/engine/src/io/data_parser/CSVParser.cpp index 716adb5b8..58a118af9 100644 --- a/engine/src/io/data_parser/CSVParser.cpp +++ b/engine/src/io/data_parser/CSVParser.cpp @@ -13,6 +13,8 @@ #include #include "ArgsUtil.h" +#define DEFAULT_CSV_CHUNK_SIZE 268435456 //256MB + #define checkError(error, txt) \ if(error != GDF_SUCCESS) { \ std::cerr << "ERROR: " << error << " in " << txt << std::endl; \ @@ -53,12 +55,20 @@ std::unique_ptr csv_parser::parse_batch( } else args.set_header(-1); - // Overrride `byte_range_offset` and `byte_range_size` - auto iter = args_map.find("max_bytes_chunk_read"); - if(iter != args_map.end() && !row_groups.empty()) { - auto chunk_size = std::stoll(iter->second); - args.set_byte_range_offset(chunk_size * row_groups[0]); - args.set_byte_range_size(chunk_size); + // Override `byte_range_offset` and `byte_range_size` + + if(!row_groups.empty()) { + auto iter = args_map.find("max_bytes_chunk_read"); + + if(iter == args_map.end()){ + //const auto row_groups = schema.get_rowgroups(); TODO duplicated values + args.set_byte_range_offset(DEFAULT_CSV_CHUNK_SIZE * (row_groups[0])); + args.set_byte_range_size(DEFAULT_CSV_CHUNK_SIZE); + }else if(iter->second != "None"){ + auto chunk_size = std::stoll(iter->second); + args.set_byte_range_offset(chunk_size * row_groups[0]); + args.set_byte_range_size(chunk_size); + } } cudf::io::table_with_metadata csv_table = cudf::io::read_csv(args); @@ -114,15 +124,22 @@ void csv_parser::parse_schema( args.set_nrows(1); args.set_skipfooter(0); } + // here we can infer the file size and set the nrowgroups + int num_row_groups = std::ceil(num_bytes / (double) DEFAULT_CSV_CHUNK_SIZE); + + std::vector row_groups_ids(num_row_groups); + std::iota (std::begin(row_groups_ids), std::end(row_groups_ids), 0); + schema.set_row_groups_ids({row_groups_ids}); + cudf::io::table_with_metadata table_out = cudf::io::read_csv(args); file->Close(); for(int i = 0; i < table_out.tbl->num_columns(); i++) { cudf::type_id type = table_out.tbl->get_column(i).type().id(); - size_t file_index = i; + size_t column_index = i; bool is_in_file = true; std::string name = table_out.metadata.column_names.at(i); - schema.add_column(name, type, file_index, is_in_file); + schema.add_column(name, type, column_index, is_in_file); } } diff --git a/pyblazing/pyblazing/apiv2/context.py b/pyblazing/pyblazing/apiv2/context.py index eedf8aaa7..1b137773d 100755 --- a/pyblazing/pyblazing/apiv2/context.py +++ b/pyblazing/pyblazing/apiv2/context.py @@ -914,7 +914,6 @@ def kwargs_validation(kwargs, bc_api_str): "num_rows", "use_index", "max_bytes_chunk_read", # Used for reading CSV files in chunks - "local_files", "get_metadata", # SQL Engines arguments "from_sql", @@ -2498,6 +2497,9 @@ def create_table(self, table_name, input, **kwargs): row_groups_ids.append(row_group_ids) table.row_groups_ids = row_groups_ids + if parsedSchema["file_type"] == DataType.CSV: + table.row_groups_ids = parsedSchema["row_groups_ids"] + elif isinstance(input, dask_cudf.core.DataFrame): table = BlazingTable( table_name, input, DataType.DASK_CUDF, client=self.dask_client