Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
shrshi committed Nov 4, 2024
1 parent fd69384 commit 3e3c8c5
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 35 deletions.
18 changes: 12 additions & 6 deletions cpp/benchmarks/io/json/json_reader_input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,20 @@ void json_read_common(cuio_source_sink_pair& source_sink,
state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size");
}

cudf::size_type json_write_bm_data(cudf::io::sink_info sink,
std::vector<cudf::type_id> const& dtypes,
cudf::io::compression_type comptype = cudf::io::compression_type::NONE)
cudf::size_type json_write_bm_data(
cudf::io::sink_info sink,
std::vector<cudf::type_id> const& dtypes,
cudf::io::compression_type comptype = cudf::io::compression_type::NONE)
{
auto const tbl = create_random_table(
cycle_dtypes(dtypes, num_cols), table_size_bytes{data_size}, data_profile_builder());
auto const view = tbl->view();

cudf::io::json_writer_options const write_opts =
cudf::io::json_writer_options::builder(sink, view).na_rep("null").rows_per_chunk(100'000).compression(comptype);
cudf::io::json_writer_options::builder(sink, view)
.na_rep("null")
.rows_per_chunk(100'000)
.compression(comptype);
cudf::io::write_json(write_opts);
return view.num_rows();
}
Expand All @@ -90,7 +94,8 @@ void BM_json_read_io(nvbench::state& state, nvbench::type_list<nvbench::enum_typ
}

template <cudf::io::compression_type comptype, io_type IO>
void BM_json_read_compressed_io(nvbench::state& state, nvbench::type_list<nvbench::enum_type<comptype>, nvbench::enum_type<IO>>)
void BM_json_read_compressed_io(
nvbench::state& state, nvbench::type_list<nvbench::enum_type<comptype>, nvbench::enum_type<IO>>)
{
cuio_source_sink_pair source_sink(IO);
auto const d_type = get_type_or_group({static_cast<int32_t>(data_type::INTEGRAL),
Expand Down Expand Up @@ -143,7 +148,8 @@ NVBENCH_BENCH_TYPES(BM_json_read_io, NVBENCH_TYPE_AXES(io_list))
.set_type_axes_names({"io"})
.set_min_samples(4);

NVBENCH_BENCH_TYPES(BM_json_read_compressed_io, NVBENCH_TYPE_AXES(compression_list, nvbench::enum_type_list<io_type::FILEPATH>))
NVBENCH_BENCH_TYPES(BM_json_read_compressed_io,
NVBENCH_TYPE_AXES(compression_list, nvbench::enum_type_list<io_type::FILEPATH>))
.set_name("json_read_compressed_io")
.set_type_axes_names({"compression_type", "io"})
.set_min_samples(4);
27 changes: 15 additions & 12 deletions cpp/src/io/comp/comp.cu
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cudf/utilities/span.hpp>

#include <zlib.h> // compress

#include <cstring> // memset

namespace cudf {
Expand Down Expand Up @@ -128,19 +129,20 @@ struct zip_archive_s {
std::vector<std::uint8_t> compress_gzip(host_span<uint8_t const> src, rmm::cuda_stream_view stream)
{
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = src.size();
zs.next_in = reinterpret_cast<unsigned char*>(const_cast<unsigned char*>(src.data()));
zs.next_in = reinterpret_cast<unsigned char*>(const_cast<unsigned char*>(src.data()));

std::vector<uint8_t> dst(src.size());
zs.avail_out = src.size();
zs.next_out = dst.data();
zs.next_out = dst.data();

int windowbits = 15;
int windowbits = 15;
int gzip_encoding = 16;
int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowbits | gzip_encoding, 8, Z_DEFAULT_STRATEGY);
int ret = deflateInit2(
&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowbits | gzip_encoding, 8, Z_DEFAULT_STRATEGY);
CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression initialization failed.");

deflate(&zs, Z_FINISH);
Expand All @@ -153,7 +155,8 @@ std::vector<std::uint8_t> compress_gzip(host_span<uint8_t const> src, rmm::cuda_
/**
* @brief SNAPPY host decompressor
*/
std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src, rmm::cuda_stream_view stream)
std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
// TODO: to be completed
rmm::device_uvector<std::uint8_t> d_src(src.size(), stream);
Expand All @@ -165,18 +168,18 @@ std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src, rmm::cud
rmm::device_uvector<compression_result> d_status(1, stream);

/*
gpu_snap(device_span<device_span<std::uint8_t const> const>{d_src},
gpu_snap(device_span<device_span<std::uint8_t const> const>{d_src},
device_span<device_span<std::uint8_t> const>{d_dst}, d_status, stream);
*/

std::vector<uint8_t> dst(d_dst.size());
cudf::detail::cuda_memcpy(host_span<uint8_t>{dst}, device_span<uint8_t const>{d_dst}, stream);
return dst;
}

std::vector<std::uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
switch (compression) {
case compression_type::GZIP: return compress_gzip(src, stream);
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ namespace io {
*
* @return Vector containing the Decompressed output
*/
std::vector<uint8_t> compress(compression_type compression, host_span<uint8_t const> src, rmm::cuda_stream_view stream);
std::vector<uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream);

/**
* @brief GZIP header flags
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/comp/io_uncomp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ constexpr uint8_t fcomment = 0x10; // Comment present
}; // namespace GZIPHeaderFlag

} // namespace io
} // namespace cudf
} // namespace CUDF_EXPORT cudf
19 changes: 11 additions & 8 deletions cpp/src/io/json/write_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
* @brief cuDF-IO JSON writer implementation
*/

#include "io/comp/comp.hpp"
#include "io/csv/durations.hpp"
#include "io/utilities/parsing_utils.cuh"
#include "io/comp/comp.hpp"
#include "lists/utilities.hpp"

#include <cudf/column/column_device_view.cuh>
Expand Down Expand Up @@ -830,9 +830,9 @@ void write_chunked(data_sink* out_sink,
}

void write_json_helper(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
std::vector<column_name_info> user_column_names = [&]() {
Expand Down Expand Up @@ -938,13 +938,16 @@ void write_json_helper(data_sink* out_sink,
void write_json(data_sink* out_sink,
table_view const& table,
json_writer_options const& options,
rmm::cuda_stream_view stream) {

if(options.get_compression() != compression_type::NONE) {
rmm::cuda_stream_view stream)
{
if (options.get_compression() != compression_type::NONE) {
std::vector<char> hbuf;
auto hbuf_sink_ptr = data_sink::create(&hbuf);
write_json_helper(hbuf_sink_ptr.get(), table, options, stream);
auto comp_hbuf = compress(options.get_compression(), host_span<uint8_t>(reinterpret_cast<uint8_t*>(hbuf.data()), hbuf.size()), stream);
auto comp_hbuf =
compress(options.get_compression(),
host_span<uint8_t>(reinterpret_cast<uint8_t*>(hbuf.data()), hbuf.size()),
stream);
out_sink->host_write(comp_hbuf.data(), comp_hbuf.size());
return;
}
Expand Down
20 changes: 13 additions & 7 deletions cpp/tests/io/json/json_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/debug_utilities.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/iterator_utilities.hpp>
#include <cudf_test/table_utilities.hpp>
#include <cudf_test/testing_main.hpp>
Expand All @@ -39,12 +39,13 @@ struct JsonWriterTest : public cudf::test::BaseFixture {};
* @brief Test fixture for parametrized JSON reader tests
*/
struct JsonCompressedWriterTest : public cudf::test::BaseFixture,
public testing::WithParamInterface<cudf::io::compression_type> {};
public testing::WithParamInterface<cudf::io::compression_type> {};

// Parametrize qualifying JSON tests for multiple compression types
INSTANTIATE_TEST_SUITE_P(JsonCompressedWriterTest,
JsonCompressedWriterTest,
::testing::Values(cudf::io::compression_type::GZIP, cudf::io::compression_type::NONE));
::testing::Values(cudf::io::compression_type::GZIP,
cudf::io::compression_type::NONE));

TEST_F(JsonWriterTest, EmptyInput)
{
Expand Down Expand Up @@ -206,15 +207,20 @@ TEST_P(JsonCompressedWriterTest, PlainTable)
cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream());

if (comptype == cudf::io::compression_type::GZIP) {
auto decomp_out_buffer = cudf::io::decompress(comptype,
cudf::host_span<uint8_t const>(reinterpret_cast<uint8_t*>(out_buffer.data()), out_buffer.size()));
auto decomp_out_buffer =
cudf::io::decompress(comptype,
cudf::host_span<uint8_t const>(
reinterpret_cast<uint8_t*>(out_buffer.data()), out_buffer.size()));
std::string const expected =
R"([{"col1":"a","col2":"d","col3":1,"col4":1.5,"col5":null},{"col1":"b","col2":"e","col3":2,"col4":2.5,"col5":2},{"col1":"c","col2":"f","col3":3,"col4":3.5,"col5":null}])";
EXPECT_EQ(expected, std::string(reinterpret_cast<char*>(decomp_out_buffer.data()), decomp_out_buffer.size()));
EXPECT_EQ(
expected,
std::string(reinterpret_cast<char*>(decomp_out_buffer.data()), decomp_out_buffer.size()));
}

cudf::io::json_reader_options json_parser_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{out_buffer.data(), out_buffer.size()})
cudf::io::json_reader_options::builder(
cudf::io::source_info{out_buffer.data(), out_buffer.size()})
.lines(false)
.compression(comptype);

Expand Down

0 comments on commit 3e3c8c5

Please sign in to comment.