Skip to content

Commit

Permalink
added snappy support
Browse files Browse the repository at this point in the history
  • Loading branch information
shrshi committed Nov 5, 2024
1 parent c314992 commit a00bf62
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 224 deletions.
5 changes: 3 additions & 2 deletions cpp/benchmarks/io/json/json_reader_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ using d_type_list = nvbench::enum_type_list<data_type::INTEGRAL,
using io_list =
nvbench::enum_type_list<io_type::FILEPATH, io_type::HOST_BUFFER, io_type::DEVICE_BUFFER>;

using compression_list =
nvbench::enum_type_list<cudf::io::compression_type::GZIP, cudf::io::compression_type::NONE>;
using compression_list = nvbench::enum_type_list<cudf::io::compression_type::GZIP,
cudf::io::compression_type::SNAPPY,
cudf::io::compression_type::NONE>;

NVBENCH_BENCH_TYPES(BM_json_read_data_type,
NVBENCH_TYPE_AXES(d_type_list, nvbench::enum_type_list<io_type::DEVICE_BUFFER>))
Expand Down
122 changes: 18 additions & 104 deletions cpp/src/io/comp/comp.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "comp.hpp"
#include "gpuinflate.hpp"
#include "io/utilities/hostdevice_vector.hpp"

#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
Expand All @@ -30,99 +31,6 @@
namespace cudf {
namespace io {

#pragma pack(push, 1)

struct gz_file_header_s {
uint8_t id1; // 0x1f
uint8_t id2; // 0x8b
uint8_t comp_mthd; // compression method (0-7=reserved, 8=deflate)
uint8_t flags; // flags (GZIPHeaderFlag)
uint8_t mtime[4]; // If non-zero: modification time (Unix format) // NOLINT
uint8_t xflags; // Extra compressor-specific flags
uint8_t os; // OS id
};

struct zip_eocd_s // end of central directory
{
uint32_t sig; // 0x0605'4b50
uint16_t disk_id; // number of this disk
uint16_t start_disk; // number of the disk with the start of the central directory
uint16_t num_entries; // number of entries in the central dir on this disk
uint16_t total_entries; // total number of entries in the central dir
uint32_t cdir_size; // size of the central directory
uint32_t cdir_offset; // offset of start of central directory with respect to the starting disk
// number uint16_t comment_len; // comment length (excluded from struct)
};

struct zip64_eocdl // end of central dir locator
{
uint32_t sig; // 0x0706'4b50
uint32_t disk_start; // number of the disk with the start of the zip64 end of central directory
uint64_t eocdr_ofs; // relative offset of the zip64 end of central directory record
uint32_t num_disks; // total number of disks
};

struct zip_cdfh_s // central directory file header
{
uint32_t sig; // 0x0201'4b50
uint16_t ver; // version made by
uint16_t min_ver; // version needed to extract
uint16_t gp_flags; // general purpose bit flag
uint16_t comp_method; // compression method
uint16_t file_time; // last mod file time
uint16_t file_date; // last mod file date
uint32_t crc32; // crc - 32
uint32_t comp_size; // compressed size
uint32_t uncomp_size; // uncompressed size
uint16_t fname_len; // filename length
uint16_t extra_len; // extra field length
uint16_t comment_len; // file comment length
uint16_t start_disk; // disk number start
uint16_t int_fattr; // internal file attributes
uint32_t ext_fattr; // external file attributes
uint32_t hdr_ofs; // relative offset of local header
};

struct zip_lfh_s {
uint32_t sig; // 0x0403'4b50
uint16_t ver; // version needed to extract
uint16_t gp_flags; // general purpose bit flag
uint16_t comp_method; // compression method
uint16_t file_time; // last mod file time
uint16_t file_date; // last mod file date
uint32_t crc32; // crc - 32
uint32_t comp_size; // compressed size
uint32_t uncomp_size; // uncompressed size
uint16_t fname_len; // filename length
uint16_t extra_len; // extra field length
};

struct bz2_file_header_s {
uint8_t sig[3]; // "BZh" // NOLINT
uint8_t blksz; // block size 1..9 in 100kB units (post-RLE)
};

#pragma pack(pop)

struct gz_archive_s {
gz_file_header_s const* fhdr;
uint16_t hcrc16; // header crc16 if present
uint16_t xlen;
uint8_t const* fxtra; // xlen bytes (optional)
uint8_t const* fname; // zero-terminated original filename if present
uint8_t const* fcomment; // zero-terminated comment if present
uint8_t const* comp_data; // compressed data
size_t comp_len; // Compressed data length
uint32_t crc32; // CRC32 of uncompressed data
uint32_t isize; // Input size modulo 2^32
};

struct zip_archive_s {
zip_eocd_s const* eocd; // end of central directory
zip64_eocdl const* eocdl; // end of central dir locator (optional)
zip_cdfh_s const* cdfh; // start of central directory file headers
};

/**
* @brief GZIP host compressor (includes header)
*/
Expand Down Expand Up @@ -153,25 +61,31 @@ std::vector<std::uint8_t> compress_gzip(host_span<uint8_t const> src, rmm::cuda_
}

/**
* @brief SNAPPY host decompressor
* @brief SNAPPY device compressor
*/
std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
// TODO: to be completed
rmm::device_uvector<std::uint8_t> d_src(src.size(), stream);
cudf::detail::cuda_memcpy_async(device_span<uint8_t>{d_src}, src, stream);
rmm::device_uvector<device_span<std::uint8_t const> const> d_srcspan(1, stream);
rmm::device_uvector<uint8_t> d_src(src.size(), stream);
rmm::device_uvector<uint8_t> d_dst(src.size(), stream);

cudf::detail::hostdevice_vector<device_span<uint8_t const>> inputs(1, stream);
inputs[0] = d_src;
inputs.host_to_device_async(stream);

rmm::device_uvector<std::uint8_t> d_dst(src.size(), stream);
cudf::detail::hostdevice_vector<device_span<uint8_t>> outputs(1, stream);
outputs[0] = d_dst;
outputs.host_to_device_async(stream);

rmm::device_uvector<compression_result> d_status(1, stream);
cudf::detail::hostdevice_vector<cudf::io::compression_result> hd_status(1, stream);
hd_status[0] = {};
hd_status.host_to_device_async(stream);

/*
gpu_snap(device_span<device_span<std::uint8_t const> const>{d_src},
device_span<device_span<std::uint8_t> const>{d_dst}, d_status, stream);
*/
gpu_snap(inputs, outputs, hd_status, stream);

hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
std::vector<uint8_t> dst(d_dst.size());
cudf::detail::cuda_memcpy(host_span<uint8_t>{dst}, device_span<uint8_t const>{d_dst}, stream);
return dst;
Expand Down
Loading

0 comments on commit a00bf62

Please sign in to comment.