diff --git a/cpp/benchmarks/io/json/json_reader_input.cpp b/cpp/benchmarks/io/json/json_reader_input.cpp index df7b009f627..b511cc0dc9b 100644 --- a/cpp/benchmarks/io/json/json_reader_input.cpp +++ b/cpp/benchmarks/io/json/json_reader_input.cpp @@ -134,8 +134,9 @@ using d_type_list = nvbench::enum_type_list; -using compression_list = - nvbench::enum_type_list; +using compression_list = nvbench::enum_type_list; NVBENCH_BENCH_TYPES(BM_json_read_data_type, NVBENCH_TYPE_AXES(d_type_list, nvbench::enum_type_list)) diff --git a/cpp/src/io/comp/comp.cu b/cpp/src/io/comp/comp.cu index 9af1a2c5f07..cd21a1419b4 100644 --- a/cpp/src/io/comp/comp.cu +++ b/cpp/src/io/comp/comp.cu @@ -16,6 +16,7 @@ #include "comp.hpp" #include "gpuinflate.hpp" +#include "io/utilities/hostdevice_vector.hpp" #include #include @@ -30,99 +31,6 @@ namespace cudf { namespace io { -#pragma pack(push, 1) - -struct gz_file_header_s { - uint8_t id1; // 0x1f - uint8_t id2; // 0x8b - uint8_t comp_mthd; // compression method (0-7=reserved, 8=deflate) - uint8_t flags; // flags (GZIPHeaderFlag) - uint8_t mtime[4]; // If non-zero: modification time (Unix format) // NOLINT - uint8_t xflags; // Extra compressor-specific flags - uint8_t os; // OS id -}; - -struct zip_eocd_s // end of central directory -{ - uint32_t sig; // 0x0605'4b50 - uint16_t disk_id; // number of this disk - uint16_t start_disk; // number of the disk with the start of the central directory - uint16_t num_entries; // number of entries in the central dir on this disk - uint16_t total_entries; // total number of entries in the central dir - uint32_t cdir_size; // size of the central directory - uint32_t cdir_offset; // offset of start of central directory with respect to the starting disk - // number uint16_t comment_len; // comment length (excluded from struct) -}; - -struct zip64_eocdl // end of central dir locator -{ - uint32_t sig; // 0x0706'4b50 - uint32_t disk_start; // number of the disk with the start of the zip64 end of central directory - uint64_t eocdr_ofs; // relative offset of the zip64 end of central directory record - uint32_t num_disks; // total number of disks -}; - -struct zip_cdfh_s // central directory file header -{ - uint32_t sig; // 0x0201'4b50 - uint16_t ver; // version made by - uint16_t min_ver; // version needed to extract - uint16_t gp_flags; // general purpose bit flag - uint16_t comp_method; // compression method - uint16_t file_time; // last mod file time - uint16_t file_date; // last mod file date - uint32_t crc32; // crc - 32 - uint32_t comp_size; // compressed size - uint32_t uncomp_size; // uncompressed size - uint16_t fname_len; // filename length - uint16_t extra_len; // extra field length - uint16_t comment_len; // file comment length - uint16_t start_disk; // disk number start - uint16_t int_fattr; // internal file attributes - uint32_t ext_fattr; // external file attributes - uint32_t hdr_ofs; // relative offset of local header -}; - -struct zip_lfh_s { - uint32_t sig; // 0x0403'4b50 - uint16_t ver; // version needed to extract - uint16_t gp_flags; // general purpose bit flag - uint16_t comp_method; // compression method - uint16_t file_time; // last mod file time - uint16_t file_date; // last mod file date - uint32_t crc32; // crc - 32 - uint32_t comp_size; // compressed size - uint32_t uncomp_size; // uncompressed size - uint16_t fname_len; // filename length - uint16_t extra_len; // extra field length -}; - -struct bz2_file_header_s { - uint8_t sig[3]; // "BZh" // NOLINT - uint8_t blksz; // block size 1..9 in 100kB units (post-RLE) -}; - -#pragma pack(pop) - -struct gz_archive_s { - gz_file_header_s const* fhdr; - uint16_t hcrc16; // header crc16 if present - uint16_t xlen; - uint8_t const* fxtra; // xlen bytes (optional) - uint8_t const* fname; // zero-terminated original filename if present - uint8_t const* fcomment; // zero-terminated comment if present - uint8_t const* comp_data; // compressed data - size_t comp_len; // Compressed data length - uint32_t crc32; // CRC32 of uncompressed data - uint32_t isize; // Input size modulo 2^32 -}; - -struct zip_archive_s { - zip_eocd_s const* eocd; // end of central directory - zip64_eocdl const* eocdl; // end of central dir locator (optional) - zip_cdfh_s const* cdfh; // start of central directory file headers -}; - /** * @brief GZIP host compressor (includes header) */ @@ -153,25 +61,31 @@ std::vector compress_gzip(host_span src, rmm::cuda_ } /** - * @brief SNAPPY host decompressor + * @brief SNAPPY device compressor */ std::vector compress_snappy(host_span src, rmm::cuda_stream_view stream) { - // TODO: to be completed - rmm::device_uvector d_src(src.size(), stream); - cudf::detail::cuda_memcpy_async(device_span{d_src}, src, stream); - rmm::device_uvector const> d_srcspan(1, stream); + rmm::device_uvector d_src(src.size(), stream); + rmm::device_uvector d_dst(src.size(), stream); + + cudf::detail::hostdevice_vector> inputs(1, stream); + inputs[0] = d_src; + inputs.host_to_device_async(stream); - rmm::device_uvector d_dst(src.size(), stream); + cudf::detail::hostdevice_vector> outputs(1, stream); + outputs[0] = d_dst; + outputs.host_to_device_async(stream); - rmm::device_uvector d_status(1, stream); + cudf::detail::hostdevice_vector hd_status(1, stream); + hd_status[0] = {}; + hd_status.host_to_device_async(stream); - /* - gpu_snap(device_span const>{d_src}, - device_span const>{d_dst}, d_status, stream); - */ + gpu_snap(inputs, outputs, hd_status, stream); + hd_status.device_to_host_sync(stream); + CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS, + "snappy compression failed"); std::vector dst(d_dst.size()); cudf::detail::cuda_memcpy(host_span{dst}, device_span{d_dst}, stream); return dst; diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index e1e65649205..681bce8bc0e 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -276,124 +276,6 @@ void cpu_inflate_vector(std::vector& dst, uint8_t const* comp_data, siz CUDF_EXPECTS(zerr == Z_STREAM_END, "Error in DEFLATE stream"); } -std::vector decompress(compression_type compression, host_span src) -{ - CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); - CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); - - auto raw = src.data(); - uint8_t const* comp_data = nullptr; - size_t comp_len = 0; - size_t uncomp_len = 0; - - switch (compression) { - case compression_type::AUTO: - case compression_type::GZIP: { - gz_archive_s gz; - if (ParseGZArchive(&gz, raw, src.size())) { - compression = compression_type::GZIP; - comp_data = gz.comp_data; - comp_len = gz.comp_len; - uncomp_len = gz.isize; - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - } - case compression_type::ZIP: { - zip_archive_s za; - if (OpenZipArchive(&za, raw, src.size())) { - size_t cdfh_ofs = 0; - for (int i = 0; i < za.eocd->num_entries; i++) { - auto const* cdfh = reinterpret_cast( - reinterpret_cast(za.cdfh) + cdfh_ofs); - int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; - if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { - // Bad cdir - break; - } - // For now, only accept with non-zero file sizes and DEFLATE - if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { - size_t lfh_ofs = cdfh->hdr_ofs; - auto const* lfh = reinterpret_cast(raw + lfh_ofs); - if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && - lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { - if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { - size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; - size_t file_end = file_start + lfh->comp_size; - if (file_end <= src.size()) { - // Pick the first valid file of non-zero size (only 1 file expected in archive) - compression = compression_type::ZIP; - comp_data = raw + file_start; - comp_len = lfh->comp_size; - uncomp_len = lfh->uncomp_size; - break; - } - } - } - } - cdfh_ofs += cdfh_len; - } - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - } - case compression_type::BZIP2: - if (src.size() > 4) { - auto const* fhdr = reinterpret_cast(raw); - // Check for BZIP2 file signature "BZh1" to "BZh9" - if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && - fhdr->blksz >= '1' && fhdr->blksz <= '9') { - compression = compression_type::BZIP2; - comp_data = raw; - comp_len = src.size(); - uncomp_len = 0; - } - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - default: CUDF_FAIL("Unsupported compressed stream type"); - } - - CUDF_EXPECTS(comp_data != nullptr and comp_len > 0, "Unsupported compressed stream type"); - - if (uncomp_len <= 0) { - uncomp_len = comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume - // ~4:1 compression for initial size - } - - if (compression == compression_type::GZIP || compression == compression_type::ZIP) { - // INFLATE - std::vector dst(uncomp_len); - cpu_inflate_vector(dst, comp_data, comp_len); - return dst; - } - if (compression == compression_type::BZIP2) { - size_t src_ofs = 0; - size_t dst_ofs = 0; - int bz_err = 0; - std::vector dst(uncomp_len); - do { - size_t dst_len = uncomp_len - dst_ofs; - bz_err = cpu_bz2_uncompress(comp_data, comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); - if (bz_err == BZ_OUTBUFF_FULL) { - // TBD: We could infer the compression ratio based on produced/consumed byte counts - // in order to minimize realloc events and over-allocation - dst_ofs = dst_len; - dst_len = uncomp_len + (uncomp_len / 2); - dst.resize(dst_len); - uncomp_len = dst_len; - } else if (bz_err == 0) { - uncomp_len = dst_len; - dst.resize(uncomp_len); - } - } while (bz_err == BZ_OUTBUFF_FULL); - CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); - return dst; - } - - CUDF_FAIL("Unsupported compressed stream type"); -} - /** * @brief ZLIB host decompressor (no header) */ @@ -622,5 +504,134 @@ size_t estimate_uncompressed_size(compression_type compression, host_span decompress(compression_type compression, host_span src) +{ + CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); + CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); + + auto raw = src.data(); + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; + + switch (compression) { + case compression_type::AUTO: + case compression_type::GZIP: { + gz_archive_s gz; + if (ParseGZArchive(&gz, raw, src.size())) { + compression = compression_type::GZIP; + comp_data = gz.comp_data; + comp_len = gz.comp_len; + uncomp_len = gz.isize; + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::ZIP: { + zip_archive_s za; + if (OpenZipArchive(&za, raw, src.size())) { + size_t cdfh_ofs = 0; + for (int i = 0; i < za.eocd->num_entries; i++) { + auto const* cdfh = reinterpret_cast( + reinterpret_cast(za.cdfh) + cdfh_ofs); + int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; + if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { + // Bad cdir + break; + } + // For now, only accept with non-zero file sizes and DEFLATE + if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { + size_t lfh_ofs = cdfh->hdr_ofs; + auto const* lfh = reinterpret_cast(raw + lfh_ofs); + if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && + lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { + if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { + size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; + size_t file_end = file_start + lfh->comp_size; + if (file_end <= src.size()) { + // Pick the first valid file of non-zero size (only 1 file expected in archive) + compression = compression_type::ZIP; + comp_data = raw + file_start; + comp_len = lfh->comp_size; + uncomp_len = lfh->uncomp_size; + break; + } + } + } + } + cdfh_ofs += cdfh_len; + } + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::BZIP2: + if (src.size() > 4) { + auto const* fhdr = reinterpret_cast(raw); + // Check for BZIP2 file signature "BZh1" to "BZh9" + if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && + fhdr->blksz >= '1' && fhdr->blksz <= '9') { + compression = compression_type::BZIP2; + comp_data = raw; + comp_len = src.size(); + uncomp_len = 0; + } + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + case compression_type::SNAPPY: + uncomp_len = estimate_uncompressed_size(compression, src); + comp_data = raw; + comp_len = src.size(); + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + default: CUDF_FAIL("Unsupported compressed stream type"); + } + + CUDF_EXPECTS(comp_data != nullptr and comp_len > 0, "Unsupported compressed stream type"); + + if (uncomp_len <= 0) { + uncomp_len = comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume + // ~4:1 compression for initial size + } + + if (compression == compression_type::GZIP || compression == compression_type::ZIP) { + // INFLATE + std::vector dst(uncomp_len); + cpu_inflate_vector(dst, comp_data, comp_len); + return dst; + } + if (compression == compression_type::BZIP2) { + size_t src_ofs = 0; + size_t dst_ofs = 0; + int bz_err = 0; + std::vector dst(uncomp_len); + do { + size_t dst_len = uncomp_len - dst_ofs; + bz_err = cpu_bz2_uncompress(comp_data, comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); + if (bz_err == BZ_OUTBUFF_FULL) { + // TBD: We could infer the compression ratio based on produced/consumed byte counts + // in order to minimize realloc events and over-allocation + dst_ofs = dst_len; + dst_len = uncomp_len + (uncomp_len / 2); + dst.resize(dst_len); + uncomp_len = dst_len; + } else if (bz_err == 0) { + uncomp_len = dst_len; + dst.resize(uncomp_len); + } + } while (bz_err == BZ_OUTBUFF_FULL); + CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); + return dst; + } + if (compression == compression_type::SNAPPY) { + std::vector dst(uncomp_len); + decompress_snappy(src, dst); + return dst; + } + + CUDF_FAIL("Unsupported compressed stream type"); +} + } // namespace io } // namespace cudf