From d1983006590a1054cc76e9376ce016dc9060bf67 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 6 Nov 2024 09:11:19 +0000 Subject: [PATCH] cleanup; snappy still not working though --- cpp/benchmarks/io/json/json_reader_input.cpp | 2 +- cpp/src/io/comp/comp.cu | 5 +- cpp/src/io/comp/comp.hpp | 18 +- cpp/src/io/comp/uncomp.cpp | 233 ++++++++++--------- cpp/tests/io/json/json_writer.cpp | 3 +- 5 files changed, 137 insertions(+), 124 deletions(-) diff --git a/cpp/benchmarks/io/json/json_reader_input.cpp b/cpp/benchmarks/io/json/json_reader_input.cpp index dc5386d1baf..1663f06a282 100644 --- a/cpp/benchmarks/io/json/json_reader_input.cpp +++ b/cpp/benchmarks/io/json/json_reader_input.cpp @@ -157,5 +157,5 @@ NVBENCH_BENCH_TYPES(BM_json_read_compressed_io, NVBENCH_TYPE_AXES(compression_list, nvbench::enum_type_list)) .set_name("json_read_compressed_io") .set_type_axes_names({"compression_type", "io"}) - .add_int64_power_of_two_axis("data_size", nvbench::range(20, 30, 2)) + .add_int64_power_of_two_axis("data_size", nvbench::range(20, 29, 1)) .set_min_samples(4); diff --git a/cpp/src/io/comp/comp.cu b/cpp/src/io/comp/comp.cu index cd21a1419b4..a35f73d589c 100644 --- a/cpp/src/io/comp/comp.cu +++ b/cpp/src/io/comp/comp.cu @@ -17,6 +17,7 @@ #include "comp.hpp" #include "gpuinflate.hpp" #include "io/utilities/hostdevice_vector.hpp" +#include "nvcomp_adapter.hpp" #include #include @@ -81,8 +82,10 @@ std::vector compress_snappy(host_span src, hd_status[0] = {}; hd_status.host_to_device_async(stream); - gpu_snap(inputs, outputs, hd_status, stream); + // gpu_snap(inputs, outputs, hd_status, stream); + nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream); + stream.synchronize(); hd_status.device_to_host_sync(stream); CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS, "snappy compression failed"); diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index d3b6f0bbdc6..e2dde8f85ec 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -27,28 +27,16 @@ namespace cudf { namespace io { /** - * @brief Decompresses a system memory buffer. + * @brief Compresses a system memory buffer. * * @param compression Type of compression of the input data - * @param src Compressed host buffer + * @param src Decompressed host buffer * - * @return Vector containing the Decompressed output + * @return Vector containing the Compressed output */ std::vector compress(compression_type compression, host_span src, rmm::cuda_stream_view stream); -/** - * @brief GZIP header flags - * See https://tools.ietf.org/html/rfc1952 - */ -namespace GZIPHeaderFlag { -constexpr uint8_t ftext = 0x01; // ASCII text hint -constexpr uint8_t fhcrc = 0x02; // Header CRC present -constexpr uint8_t fextra = 0x04; // Extra fields present -constexpr uint8_t fname = 0x08; // Original file name present -constexpr uint8_t fcomment = 0x10; // Comment present -}; // namespace GZIPHeaderFlag - } // namespace io } // namespace cudf diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 681bce8bc0e..60d715fa5ce 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "gpuinflate.hpp" #include "io/utilities/hostdevice_vector.hpp" #include "io_uncomp.hpp" #include "nvcomp_adapter.hpp" @@ -301,7 +302,7 @@ size_t decompress_gzip(host_span src, host_span dst) /** * @brief SNAPPY host decompressor */ -size_t decompress_snappy(host_span src, host_span dst) +size_t host_decompress_snappy(host_span src, host_span dst) { CUDF_EXPECTS(not dst.empty() and src.size() >= 1, "invalid Snappy decompress inputs"); uint32_t uncompressed_size, bytes_left, dst_pos; @@ -382,6 +383,50 @@ size_t decompress_snappy(host_span src, host_span dst) return uncompressed_size; } +/** + * @brief SNAPPY device decompressor + */ +size_t decompress_snappy(host_span src, + host_span dst, + rmm::cuda_stream_view stream) +{ + // Init device span of spans (source) + auto const d_src = + cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref()); + auto hd_srcs = cudf::detail::hostdevice_vector>(1, stream); + hd_srcs[0] = d_src; + hd_srcs.host_to_device_async(stream); + + // Init device span of spans (temporary destination) + auto d_dst = rmm::device_uvector(dst.size(), stream); + auto hd_dsts = cudf::detail::hostdevice_vector>(1, stream); + hd_dsts[0] = d_dst; + hd_dsts.host_to_device_async(stream); + + auto hd_stats = cudf::detail::hostdevice_vector(1, stream); + hd_stats[0] = compression_result{0, compression_status::FAILURE}; + hd_stats.host_to_device_async(stream); + + auto const max_uncomp_page_size = dst.size(); + nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, + hd_srcs, + hd_dsts, + hd_stats, + max_uncomp_page_size, + max_uncomp_page_size, + stream); + + hd_stats.device_to_host_sync(stream); + CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed"); + + // Copy temporary output to `dst` + cudf::detail::cuda_memcpy(dst.subspan(0, hd_stats[0].bytes_written), + device_span{d_dst.data(), hd_stats[0].bytes_written}, + stream); + + return hd_stats[0].bytes_written; +} + /** * @brief ZSTD decompressor that uses nvcomp */ @@ -418,97 +463,22 @@ size_t decompress_zstd(host_span src, CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed"); // Copy temporary output to `dst` - cudf::detail::cuda_memcpy_async( - dst.subspan(0, hd_stats[0].bytes_written), - device_span{d_dst.data(), hd_stats[0].bytes_written}, - stream); + cudf::detail::cuda_memcpy(dst.subspan(0, hd_stats[0].bytes_written), + device_span{d_dst.data(), hd_stats[0].bytes_written}, + stream); return hd_stats[0].bytes_written; } -size_t decompress(compression_type compression, - host_span src, - host_span dst, - rmm::cuda_stream_view stream) -{ - switch (compression) { - case compression_type::GZIP: return decompress_gzip(src, dst); - case compression_type::ZLIB: return decompress_zlib(src, dst); - case compression_type::SNAPPY: return decompress_snappy(src, dst); - case compression_type::ZSTD: return decompress_zstd(src, dst, stream); - default: CUDF_FAIL("Unsupported compression type"); - } -} - -size_t estimate_uncompressed_size(compression_type compression, host_span src) -{ - auto raw = src.data(); - switch (compression) { - case compression_type::NONE: return src.size(); - case compression_type::GZIP: { - gz_archive_s gz; - if (ParseGZArchive(&gz, src.data(), src.size())) return gz.isize; - } - case compression_type::ZIP: { - zip_archive_s za; - if (OpenZipArchive(&za, src.data(), src.size())) { - size_t cdfh_ofs = 0; - for (int i = 0; i < za.eocd->num_entries; i++) { - auto const* cdfh = reinterpret_cast( - reinterpret_cast(za.cdfh) + cdfh_ofs); - int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; - if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { - // Bad cdir - break; - } - // For now, only accept with non-zero file sizes and DEFLATE - if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { - size_t lfh_ofs = cdfh->hdr_ofs; - auto const* lfh = reinterpret_cast(raw + lfh_ofs); - if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && - lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { - if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { - size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; - size_t file_end = file_start + lfh->comp_size; - if (file_end <= src.size()) { - // Pick the first valid file of non-zero size (only 1 file expected in archive) - return lfh->uncomp_size; - } - } - } - } - cdfh_ofs += cdfh_len; - } - } - } - case compression_type::SNAPPY: { - uint32_t uncompressed_size; - auto cur = src.begin(); - auto const end = src.end(); - // Read uncompressed length (varint) - { - uint32_t l = 0, c; - uncompressed_size = 0; - do { - c = *cur++; - auto const lo7 = c & 0x7f; - if (l >= 28 && c > 0xf) { return 0; } - uncompressed_size |= lo7 << l; - l += 7; - } while (c > 0x7f && cur < end); - CUDF_EXPECTS(uncompressed_size != 0 and cur < end, "Destination buffer too small"); - } - return uncompressed_size; - } - default: return 0; - } -} +struct source_properties { + compression_type compression = compression_type::NONE; + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; +}; -std::vector decompress(compression_type compression, host_span src) +source_properties get_source_properties(compression_type compression, host_span src) { - CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); - CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); - auto raw = src.data(); uint8_t const* comp_data = nullptr; size_t comp_len = 0; @@ -565,7 +535,7 @@ std::vector decompress(compression_type compression, host_span 4) { auto const* fhdr = reinterpret_cast(raw); // Check for BZIP2 file signature "BZh1" to "BZh9" @@ -579,54 +549,105 @@ std::vector decompress(compression_type compression, host_span= 28 && c > 0xf) { + uncomp_len = 0; + break; + } + uncomp_len |= lo7 << l; + l += 7; + } while (c > 0x7f && cur < end); + CUDF_EXPECTS(uncomp_len != 0 and cur < end, "Error in retrieving SNAPPY source properties"); + } + comp_data = raw; + comp_len = src.size(); if (compression != compression_type::AUTO) break; [[fallthrough]]; + } default: CUDF_FAIL("Unsupported compressed stream type"); } - CUDF_EXPECTS(comp_data != nullptr and comp_len > 0, "Unsupported compressed stream type"); + return source_properties{compression, comp_data, comp_len, uncomp_len}; +} + +size_t estimate_uncompressed_size(compression_type compression, host_span src) +{ + auto srcprops = get_source_properties(compression, src); + return srcprops.uncomp_len; +} + +size_t decompress(compression_type compression, + host_span src, + host_span dst, + rmm::cuda_stream_view stream) +{ + switch (compression) { + case compression_type::GZIP: return decompress_gzip(src, dst); + case compression_type::ZLIB: return decompress_zlib(src, dst); + case compression_type::SNAPPY: return decompress_snappy(src, dst, stream); + case compression_type::ZSTD: return decompress_zstd(src, dst, stream); + default: CUDF_FAIL("Unsupported compression type"); + } +} + +std::vector decompress(compression_type compression, host_span src) +{ + CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); + CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); + + auto srcprops = get_source_properties(compression, src); + CUDF_EXPECTS(srcprops.comp_data != nullptr and srcprops.comp_len > 0, + "Unsupported compressed stream type"); - if (uncomp_len <= 0) { - uncomp_len = comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume - // ~4:1 compression for initial size + if (srcprops.uncomp_len <= 0) { + srcprops.uncomp_len = + srcprops.comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume + // ~4:1 compression for initial size } if (compression == compression_type::GZIP || compression == compression_type::ZIP) { // INFLATE - std::vector dst(uncomp_len); - cpu_inflate_vector(dst, comp_data, comp_len); + std::vector dst(srcprops.uncomp_len); + cpu_inflate_vector(dst, srcprops.comp_data, srcprops.comp_len); return dst; } if (compression == compression_type::BZIP2) { size_t src_ofs = 0; size_t dst_ofs = 0; int bz_err = 0; - std::vector dst(uncomp_len); + std::vector dst(srcprops.uncomp_len); do { - size_t dst_len = uncomp_len - dst_ofs; - bz_err = cpu_bz2_uncompress(comp_data, comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); + size_t dst_len = srcprops.uncomp_len - dst_ofs; + bz_err = cpu_bz2_uncompress( + srcprops.comp_data, srcprops.comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); if (bz_err == BZ_OUTBUFF_FULL) { // TBD: We could infer the compression ratio based on produced/consumed byte counts // in order to minimize realloc events and over-allocation dst_ofs = dst_len; - dst_len = uncomp_len + (uncomp_len / 2); + dst_len = srcprops.uncomp_len + (srcprops.uncomp_len / 2); dst.resize(dst_len); - uncomp_len = dst_len; + srcprops.uncomp_len = dst_len; } else if (bz_err == 0) { - uncomp_len = dst_len; - dst.resize(uncomp_len); + srcprops.uncomp_len = dst_len; + dst.resize(srcprops.uncomp_len); } } while (bz_err == BZ_OUTBUFF_FULL); CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); return dst; } if (compression == compression_type::SNAPPY) { - std::vector dst(uncomp_len); - decompress_snappy(src, dst); + std::vector dst(srcprops.uncomp_len); + decompress_snappy(src, dst, cudf::get_default_stream()); return dst; } diff --git a/cpp/tests/io/json/json_writer.cpp b/cpp/tests/io/json/json_writer.cpp index cdd89f372e6..8ac647e694a 100644 --- a/cpp/tests/io/json/json_writer.cpp +++ b/cpp/tests/io/json/json_writer.cpp @@ -45,6 +45,7 @@ struct JsonCompressedWriterTest : public cudf::test::BaseFixture, INSTANTIATE_TEST_SUITE_P(JsonCompressedWriterTest, JsonCompressedWriterTest, ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::SNAPPY, cudf::io::compression_type::NONE)); TEST_F(JsonWriterTest, EmptyInput) @@ -206,7 +207,7 @@ TEST_P(JsonCompressedWriterTest, PlainTable) cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream()); - if (comptype == cudf::io::compression_type::GZIP) { + if (comptype != cudf::io::compression_type::GZIP) { auto decomp_out_buffer = cudf::io::decompress(comptype, cudf::host_span(