Skip to content

Commit

Permalink
cleanup; snappy still not working though
Browse files Browse the repository at this point in the history
  • Loading branch information
shrshi committed Nov 6, 2024
1 parent 1177f4a commit d198300
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 124 deletions.
2 changes: 1 addition & 1 deletion cpp/benchmarks/io/json/json_reader_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,5 @@ NVBENCH_BENCH_TYPES(BM_json_read_compressed_io,
NVBENCH_TYPE_AXES(compression_list, nvbench::enum_type_list<io_type::FILEPATH>))
.set_name("json_read_compressed_io")
.set_type_axes_names({"compression_type", "io"})
.add_int64_power_of_two_axis("data_size", nvbench::range(20, 30, 2))
.add_int64_power_of_two_axis("data_size", nvbench::range(20, 29, 1))
.set_min_samples(4);
5 changes: 4 additions & 1 deletion cpp/src/io/comp/comp.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "comp.hpp"
#include "gpuinflate.hpp"
#include "io/utilities/hostdevice_vector.hpp"
#include "nvcomp_adapter.hpp"

#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
Expand Down Expand Up @@ -81,8 +82,10 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
hd_status[0] = {};
hd_status.host_to_device_async(stream);

gpu_snap(inputs, outputs, hd_status, stream);
// gpu_snap(inputs, outputs, hd_status, stream);
nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream);

stream.synchronize();
hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
Expand Down
18 changes: 3 additions & 15 deletions cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,16 @@ namespace cudf {
namespace io {

/**
* @brief Decompresses a system memory buffer.
* @brief Compresses a system memory buffer.
*
* @param compression Type of compression of the input data
* @param src Compressed host buffer
* @param src Decompressed host buffer
*
* @return Vector containing the Decompressed output
* @return Vector containing the Compressed output
*/
std::vector<uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream);

/**
* @brief GZIP header flags
* See https://tools.ietf.org/html/rfc1952
*/
namespace GZIPHeaderFlag {
constexpr uint8_t ftext = 0x01; // ASCII text hint
constexpr uint8_t fhcrc = 0x02; // Header CRC present
constexpr uint8_t fextra = 0x04; // Extra fields present
constexpr uint8_t fname = 0x08; // Original file name present
constexpr uint8_t fcomment = 0x10; // Comment present
}; // namespace GZIPHeaderFlag

} // namespace io
} // namespace cudf
233 changes: 127 additions & 106 deletions cpp/src/io/comp/uncomp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "gpuinflate.hpp"
#include "io/utilities/hostdevice_vector.hpp"
#include "io_uncomp.hpp"
#include "nvcomp_adapter.hpp"
Expand Down Expand Up @@ -301,7 +302,7 @@ size_t decompress_gzip(host_span<uint8_t const> src, host_span<uint8_t> dst)
/**
* @brief SNAPPY host decompressor
*/
size_t decompress_snappy(host_span<uint8_t const> src, host_span<uint8_t> dst)
size_t host_decompress_snappy(host_span<uint8_t const> src, host_span<uint8_t> dst)
{
CUDF_EXPECTS(not dst.empty() and src.size() >= 1, "invalid Snappy decompress inputs");
uint32_t uncompressed_size, bytes_left, dst_pos;
Expand Down Expand Up @@ -382,6 +383,50 @@ size_t decompress_snappy(host_span<uint8_t const> src, host_span<uint8_t> dst)
return uncompressed_size;
}

/**
* @brief SNAPPY device decompressor
*/
size_t decompress_snappy(host_span<uint8_t const> src,
host_span<uint8_t> dst,
rmm::cuda_stream_view stream)
{
// Init device span of spans (source)
auto const d_src =
cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref());
auto hd_srcs = cudf::detail::hostdevice_vector<device_span<uint8_t const>>(1, stream);
hd_srcs[0] = d_src;
hd_srcs.host_to_device_async(stream);

// Init device span of spans (temporary destination)
auto d_dst = rmm::device_uvector<uint8_t>(dst.size(), stream);
auto hd_dsts = cudf::detail::hostdevice_vector<device_span<uint8_t>>(1, stream);
hd_dsts[0] = d_dst;
hd_dsts.host_to_device_async(stream);

auto hd_stats = cudf::detail::hostdevice_vector<compression_result>(1, stream);
hd_stats[0] = compression_result{0, compression_status::FAILURE};
hd_stats.host_to_device_async(stream);

auto const max_uncomp_page_size = dst.size();
nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY,
hd_srcs,
hd_dsts,
hd_stats,
max_uncomp_page_size,
max_uncomp_page_size,
stream);

hd_stats.device_to_host_sync(stream);
CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed");

// Copy temporary output to `dst`
cudf::detail::cuda_memcpy(dst.subspan(0, hd_stats[0].bytes_written),
device_span<uint8_t const>{d_dst.data(), hd_stats[0].bytes_written},
stream);

return hd_stats[0].bytes_written;
}

/**
* @brief ZSTD decompressor that uses nvcomp
*/
Expand Down Expand Up @@ -418,97 +463,22 @@ size_t decompress_zstd(host_span<uint8_t const> src,
CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed");

// Copy temporary output to `dst`
cudf::detail::cuda_memcpy_async(
dst.subspan(0, hd_stats[0].bytes_written),
device_span<uint8_t const>{d_dst.data(), hd_stats[0].bytes_written},
stream);
cudf::detail::cuda_memcpy(dst.subspan(0, hd_stats[0].bytes_written),
device_span<uint8_t const>{d_dst.data(), hd_stats[0].bytes_written},
stream);

return hd_stats[0].bytes_written;
}

size_t decompress(compression_type compression,
host_span<uint8_t const> src,
host_span<uint8_t> dst,
rmm::cuda_stream_view stream)
{
switch (compression) {
case compression_type::GZIP: return decompress_gzip(src, dst);
case compression_type::ZLIB: return decompress_zlib(src, dst);
case compression_type::SNAPPY: return decompress_snappy(src, dst);
case compression_type::ZSTD: return decompress_zstd(src, dst, stream);
default: CUDF_FAIL("Unsupported compression type");
}
}

size_t estimate_uncompressed_size(compression_type compression, host_span<uint8_t const> src)
{
auto raw = src.data();
switch (compression) {
case compression_type::NONE: return src.size();
case compression_type::GZIP: {
gz_archive_s gz;
if (ParseGZArchive(&gz, src.data(), src.size())) return gz.isize;
}
case compression_type::ZIP: {
zip_archive_s za;
if (OpenZipArchive(&za, src.data(), src.size())) {
size_t cdfh_ofs = 0;
for (int i = 0; i < za.eocd->num_entries; i++) {
auto const* cdfh = reinterpret_cast<zip_cdfh_s const*>(
reinterpret_cast<uint8_t const*>(za.cdfh) + cdfh_ofs);
int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len;
if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) {
// Bad cdir
break;
}
// For now, only accept with non-zero file sizes and DEFLATE
if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) {
size_t lfh_ofs = cdfh->hdr_ofs;
auto const* lfh = reinterpret_cast<zip_lfh_s const*>(raw + lfh_ofs);
if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 &&
lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) {
if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) {
size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len;
size_t file_end = file_start + lfh->comp_size;
if (file_end <= src.size()) {
// Pick the first valid file of non-zero size (only 1 file expected in archive)
return lfh->uncomp_size;
}
}
}
}
cdfh_ofs += cdfh_len;
}
}
}
case compression_type::SNAPPY: {
uint32_t uncompressed_size;
auto cur = src.begin();
auto const end = src.end();
// Read uncompressed length (varint)
{
uint32_t l = 0, c;
uncompressed_size = 0;
do {
c = *cur++;
auto const lo7 = c & 0x7f;
if (l >= 28 && c > 0xf) { return 0; }
uncompressed_size |= lo7 << l;
l += 7;
} while (c > 0x7f && cur < end);
CUDF_EXPECTS(uncompressed_size != 0 and cur < end, "Destination buffer too small");
}
return uncompressed_size;
}
default: return 0;
}
}
struct source_properties {
compression_type compression = compression_type::NONE;
uint8_t const* comp_data = nullptr;
size_t comp_len = 0;
size_t uncomp_len = 0;
};

std::vector<uint8_t> decompress(compression_type compression, host_span<uint8_t const> src)
source_properties get_source_properties(compression_type compression, host_span<uint8_t const> src)
{
CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr");
CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0");

auto raw = src.data();
uint8_t const* comp_data = nullptr;
size_t comp_len = 0;
Expand Down Expand Up @@ -565,7 +535,7 @@ std::vector<uint8_t> decompress(compression_type compression, host_span<uint8_t
if (compression != compression_type::AUTO) break;
[[fallthrough]];
}
case compression_type::BZIP2:
case compression_type::BZIP2: {
if (src.size() > 4) {
auto const* fhdr = reinterpret_cast<bz2_file_header_s const*>(raw);
// Check for BZIP2 file signature "BZh1" to "BZh9"
Expand All @@ -579,54 +549,105 @@ std::vector<uint8_t> decompress(compression_type compression, host_span<uint8_t
}
if (compression != compression_type::AUTO) break;
[[fallthrough]];
case compression_type::SNAPPY:
uncomp_len = estimate_uncompressed_size(compression, src);
comp_data = raw;
comp_len = src.size();
}
case compression_type::SNAPPY: {
uncomp_len = 0;
auto cur = src.begin();
auto const end = src.end();
// Read uncompressed length (varint)
{
uint32_t l = 0, c;
do {
c = *cur++;
auto const lo7 = c & 0x7f;
if (l >= 28 && c > 0xf) {
uncomp_len = 0;
break;
}
uncomp_len |= lo7 << l;
l += 7;
} while (c > 0x7f && cur < end);
CUDF_EXPECTS(uncomp_len != 0 and cur < end, "Error in retrieving SNAPPY source properties");
}
comp_data = raw;
comp_len = src.size();
if (compression != compression_type::AUTO) break;
[[fallthrough]];
}
default: CUDF_FAIL("Unsupported compressed stream type");
}

CUDF_EXPECTS(comp_data != nullptr and comp_len > 0, "Unsupported compressed stream type");
return source_properties{compression, comp_data, comp_len, uncomp_len};
}

size_t estimate_uncompressed_size(compression_type compression, host_span<uint8_t const> src)
{
auto srcprops = get_source_properties(compression, src);
return srcprops.uncomp_len;
}

size_t decompress(compression_type compression,
host_span<uint8_t const> src,
host_span<uint8_t> dst,
rmm::cuda_stream_view stream)
{
switch (compression) {
case compression_type::GZIP: return decompress_gzip(src, dst);
case compression_type::ZLIB: return decompress_zlib(src, dst);
case compression_type::SNAPPY: return decompress_snappy(src, dst, stream);
case compression_type::ZSTD: return decompress_zstd(src, dst, stream);
default: CUDF_FAIL("Unsupported compression type");
}
}

std::vector<uint8_t> decompress(compression_type compression, host_span<uint8_t const> src)
{
CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr");
CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0");

auto srcprops = get_source_properties(compression, src);
CUDF_EXPECTS(srcprops.comp_data != nullptr and srcprops.comp_len > 0,
"Unsupported compressed stream type");

if (uncomp_len <= 0) {
uncomp_len = comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume
// ~4:1 compression for initial size
if (srcprops.uncomp_len <= 0) {
srcprops.uncomp_len =
srcprops.comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume
// ~4:1 compression for initial size
}

if (compression == compression_type::GZIP || compression == compression_type::ZIP) {
// INFLATE
std::vector<uint8_t> dst(uncomp_len);
cpu_inflate_vector(dst, comp_data, comp_len);
std::vector<uint8_t> dst(srcprops.uncomp_len);
cpu_inflate_vector(dst, srcprops.comp_data, srcprops.comp_len);
return dst;
}
if (compression == compression_type::BZIP2) {
size_t src_ofs = 0;
size_t dst_ofs = 0;
int bz_err = 0;
std::vector<uint8_t> dst(uncomp_len);
std::vector<uint8_t> dst(srcprops.uncomp_len);
do {
size_t dst_len = uncomp_len - dst_ofs;
bz_err = cpu_bz2_uncompress(comp_data, comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs);
size_t dst_len = srcprops.uncomp_len - dst_ofs;
bz_err = cpu_bz2_uncompress(
srcprops.comp_data, srcprops.comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs);
if (bz_err == BZ_OUTBUFF_FULL) {
// TBD: We could infer the compression ratio based on produced/consumed byte counts
// in order to minimize realloc events and over-allocation
dst_ofs = dst_len;
dst_len = uncomp_len + (uncomp_len / 2);
dst_len = srcprops.uncomp_len + (srcprops.uncomp_len / 2);
dst.resize(dst_len);
uncomp_len = dst_len;
srcprops.uncomp_len = dst_len;
} else if (bz_err == 0) {
uncomp_len = dst_len;
dst.resize(uncomp_len);
srcprops.uncomp_len = dst_len;
dst.resize(srcprops.uncomp_len);
}
} while (bz_err == BZ_OUTBUFF_FULL);
CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream");
return dst;
}
if (compression == compression_type::SNAPPY) {
std::vector<uint8_t> dst(uncomp_len);
decompress_snappy(src, dst);
std::vector<uint8_t> dst(srcprops.uncomp_len);
decompress_snappy(src, dst, cudf::get_default_stream());
return dst;
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/tests/io/json/json_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct JsonCompressedWriterTest : public cudf::test::BaseFixture,
INSTANTIATE_TEST_SUITE_P(JsonCompressedWriterTest,
JsonCompressedWriterTest,
::testing::Values(cudf::io::compression_type::GZIP,
cudf::io::compression_type::SNAPPY,
cudf::io::compression_type::NONE));

TEST_F(JsonWriterTest, EmptyInput)
Expand Down Expand Up @@ -206,7 +207,7 @@ TEST_P(JsonCompressedWriterTest, PlainTable)

cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream());

if (comptype == cudf::io::compression_type::GZIP) {
if (comptype != cudf::io::compression_type::GZIP) {
auto decomp_out_buffer =
cudf::io::decompress(comptype,
cudf::host_span<uint8_t const>(
Expand Down

0 comments on commit d198300

Please sign in to comment.