diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 3b715aff7..77e896160 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -287,11 +287,19 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { extraction_options.add(input_options); po::options_description decompression_options("Decompression Options"); + // clang-format off decompression_options.add_options()( "ordered", po::bool_switch(&m_ordered_decompression), "Enable decompression in ascending timestamp order for this archive" + )( + "ordered-chunk-size", + po::value(&m_ordered_chunk_size) + ->default_value(m_ordered_chunk_size), + "Number of records to include in each output file when decompressing records " + "in ascending timestamp order" ); + // clang-format on extraction_options.add(decompression_options); po::positional_options_description positional_options; @@ -336,6 +344,11 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { if (m_output_dir.empty()) { throw std::invalid_argument("No output directory specified"); } + + if (0 != m_ordered_chunk_size && false == m_ordered_decompression) { + throw std::invalid_argument("ordered-chunk-size must be used with ordered argument" + ); + } } else if ((char)Command::Search == command_input) { std::string archives_dir; std::string query; diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 4c367509a..0f3d8c556 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -106,6 +106,8 @@ class CommandLineArguments { bool get_ordered_decompression() const { return m_ordered_decompression; } + size_t get_ordered_chunk_size() const { return m_ordered_chunk_size; } + private: // Methods /** @@ -170,6 +172,7 @@ class CommandLineArguments { size_t m_max_document_size{512ULL * 1024 * 1024}; // 512 MB bool m_structurize_arrays{false}; bool m_ordered_decompression{false}; + size_t m_ordered_chunk_size{0}; // Metadata db variables std::optional m_metadata_db_config; diff --git a/components/core/src/clp_s/JsonConstructor.cpp b/components/core/src/clp_s/JsonConstructor.cpp index e48d370d1..68151a1a7 100644 --- a/components/core/src/clp_s/JsonConstructor.cpp +++ b/components/core/src/clp_s/JsonConstructor.cpp @@ -16,7 +16,8 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option) : m_output_dir(option.output_dir), m_archives_dir(option.archives_dir), m_ordered{option.ordered}, - m_archive_id(option.archive_id) { + m_archive_id(option.archive_id), + m_ordered_chunk_size(option.ordered_chunk_size) { std::error_code error_code; if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) { throw OperationFailed( @@ -44,24 +45,25 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option) } void JsonConstructor::store() { - FileWriter writer; - // TODO: change this when doing chunking - writer.open(m_output_dir + "/original", FileWriter::OpenMode::CreateIfNonexistentForAppending); - m_archive_reader = std::make_unique(); m_archive_reader->open(m_archives_dir, m_archive_id); m_archive_reader->read_dictionaries_and_metadata(); if (false == m_ordered) { + FileWriter writer; + writer.open( + m_output_dir + "/original", + FileWriter::OpenMode::CreateIfNonexistentForAppending + ); m_archive_reader->store(writer); + + writer.close(); } else { - construct_in_order(writer); + construct_in_order(); } m_archive_reader->close(); - - writer.close(); } -void JsonConstructor::construct_in_order(FileWriter& writer) { +void JsonConstructor::construct_in_order() { std::string buffer; auto tables = m_archive_reader->read_all_tables(); using ReaderPointer = std::shared_ptr; @@ -72,15 +74,59 @@ void JsonConstructor::construct_in_order(FileWriter& writer) { // Clear tables vector so that memory gets deallocated after we have marshalled all records for // a given table tables.clear(); + + epochtime_t first_timestamp{0}; + epochtime_t last_timestamp{0}; + size_t num_records_marshalled{0}; + auto src_path = std::filesystem::path(m_output_dir) / m_archive_id; + FileWriter writer; + writer.open(src_path, FileWriter::OpenMode::CreateForWriting); + + auto finalize_chunk = [&](bool open_new_writer) { + writer.close(); + std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_" + + std::to_string(last_timestamp) + ".jsonl"; + auto new_file_path = std::filesystem::path(new_file_name); + std::error_code ec; + std::filesystem::rename(src_path, new_file_path, ec); + if (ec) { + throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message()); + } + + if (open_new_writer) { + writer.open(src_path, FileWriter::OpenMode::CreateForWriting); + } + }; + while (false == record_queue.empty()) { ReaderPointer next = record_queue.top(); record_queue.pop(); + last_timestamp = next->get_next_timestamp(); + if (0 == num_records_marshalled) { + first_timestamp = last_timestamp; + } next->get_next_message(buffer); if (false == next->done()) { record_queue.emplace(std::move(next)); } writer.write(buffer.c_str(), buffer.length()); + num_records_marshalled += 1; + + if (0 != m_ordered_chunk_size && num_records_marshalled >= m_ordered_chunk_size) { + finalize_chunk(true); + num_records_marshalled = 0; + } + } + + if (num_records_marshalled > 0) { + finalize_chunk(false); + } else { + writer.close(); + std::error_code ec; + std::filesystem::remove(src_path, ec); + if (ec) { + throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message()); + } } - writer.close(); } } // namespace clp_s diff --git a/components/core/src/clp_s/JsonConstructor.hpp b/components/core/src/clp_s/JsonConstructor.hpp index 0324924b9..22a2daf59 100644 --- a/components/core/src/clp_s/JsonConstructor.hpp +++ b/components/core/src/clp_s/JsonConstructor.hpp @@ -20,6 +20,7 @@ struct JsonConstructorOption { std::string archive_id; std::string output_dir; bool ordered; + size_t ordered_chunk_size; }; class JsonConstructor { @@ -55,14 +56,14 @@ class JsonConstructor { /** * Reads all of the tables from m_archive_reader and writes all of the records * they contain to writer in timestamp order. - * @param writer */ - void construct_in_order(FileWriter& writer); + void construct_in_order(); std::string m_archives_dir; std::string m_archive_id; std::string m_output_dir; bool m_ordered{false}; + size_t m_ordered_chunk_size{0}; std::unique_ptr m_archive_reader; }; diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 74235e0e1..d01ed0fe0 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -273,6 +273,7 @@ int main(int argc, char const* argv[]) { option.output_dir = command_line_arguments.get_output_dir(); option.ordered = command_line_arguments.get_ordered_decompression(); option.archives_dir = archives_dir; + option.ordered_chunk_size = command_line_arguments.get_ordered_chunk_size(); try { auto const& archive_id = command_line_arguments.get_archive_id(); if (false == archive_id.empty()) {