diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index c8cf08b22..948a9d701 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -2,17 +2,47 @@ add_subdirectory(search/kql) set( CLP_SOURCES + ../clp/BufferReader.cpp + ../clp/BufferReader.hpp ../clp/cli_utils.cpp ../clp/cli_utils.hpp ../clp/database_utils.cpp ../clp/database_utils.hpp ../clp/Defs.h ../clp/ErrorCode.hpp + ../clp/ffi/KeyValuePairLogEvent.cpp + ../clp/ffi/KeyValuePairLogEvent.hpp + ../clp/ffi/SchemaTree.cpp + ../clp/ffi/SchemaTree.hpp + ../clp/ffi/SchemaTreeNode.hpp + ../clp/ffi/Value.hpp + ../clp/ffi/ir_stream/Deserializer.cpp + ../clp/ffi/ir_stream/Deserializer.hpp + ../clp/ffi/ir_stream/Serializer.cpp + ../clp/ffi/ir_stream/Serializer.hpp + ../clp/ffi/ir_stream/decoding_methods.cpp + ../clp/ffi/ir_stream/decoding_methods.hpp + ../clp/ffi/ir_stream/encoding_methods.cpp + ../clp/ffi/ir_stream/encoding_methods.hpp + ../clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp + ../clp/ffi/ir_stream/ir_unit_deserialization_methods.hpp + ../clp/ffi/ir_stream/protocol_constants.hpp + ../clp/ffi/ir_stream/utils.cpp + ../clp/ffi/ir_stream/utils.hpp + ../clp/ffi/utils.cpp + ../clp/ffi/utils.hpp + ../clp/FileDescriptor.cpp + ../clp/FileDescriptor.hpp ../clp/GlobalMetadataDB.hpp ../clp/GlobalMetadataDBConfig.cpp ../clp/GlobalMetadataDBConfig.hpp ../clp/GlobalMySQLMetadataDB.cpp ../clp/GlobalMySQLMetadataDB.hpp + ../clp/ir/EncodedTextAst.cpp + ../clp/ir/EncodedTextAst.hpp + ../clp/ir/parsing.cpp + ../clp/ir/parsing.hpp + ../clp/ir/types.hpp ../clp/MySQLDB.cpp ../clp/MySQLDB.hpp ../clp/MySQLParamBindings.cpp @@ -21,11 +51,19 @@ set( ../clp/MySQLPreparedStatement.hpp ../clp/networking/socket_utils.cpp ../clp/networking/socket_utils.hpp + ../clp/ReadOnlyMemoryMappedFile.cpp + ../clp/ReadOnlyMemoryMappedFile.hpp ../clp/ReaderInterface.cpp ../clp/ReaderInterface.hpp ../clp/streaming_archive/ArchiveMetadata.cpp ../clp/streaming_archive/ArchiveMetadata.hpp + ../clp/streaming_compression/zstd/Decompressor.cpp + ../clp/streaming_compression/zstd/Decompressor.hpp + ../clp/time_types.hpp ../clp/TraceableException.hpp + ../clp/type_utils.hpp + ../clp/utf8_utils.cpp + ../clp/utf8_utils.hpp ../clp/WriterInterface.cpp ../clp/WriterInterface.hpp ) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 4cfe017ac..0b31af3ab 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -106,11 +106,15 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { std::cerr << " c - compress" << std::endl; std::cerr << " x - decompress" << std::endl; std::cerr << " s - search" << std::endl; + std::cerr << " r - JSON to IR Format" << std::endl; + std::cerr << " i - compress IR format" << std::endl; std::cerr << std::endl; std::cerr << "Try " << " c --help OR" << " x --help OR" - << " s --help for command-specific details." << std::endl; + << " s --help OR" + << " r --help OR" + << " i --help for command-specific details." << std::endl; po::options_description visible_options; visible_options.add(general_options); @@ -125,6 +129,8 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { case (char)Command::Compress: case (char)Command::Extract: case (char)Command::Search: + case (char)Command::JsonToIr: + case (char)Command::IrCompress: m_command = (Command)command_input; break; default: @@ -242,6 +248,257 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { throw std::invalid_argument("No input paths specified."); } + // Parse and validate global metadata DB config + if (false == metadata_db_config_file_path.empty()) { + clp::GlobalMetadataDBConfig metadata_db_config; + try { + metadata_db_config.parse_config_file(metadata_db_config_file_path); + } catch (std::exception& e) { + SPDLOG_ERROR("Failed to validate metadata database config - {}.", e.what()); + return ParsingResult::Failure; + } + + if (clp::GlobalMetadataDBConfig::MetadataDBType::MySQL + != metadata_db_config.get_metadata_db_type()) + { + SPDLOG_ERROR( + "Invalid metadata database type for {}; only supported type is MySQL.", + m_program_name + ); + return ParsingResult::Failure; + } + + m_metadata_db_config = std::move(metadata_db_config); + } + } else if (Command::IrCompress == m_command) { + po::options_description compression_positional_options; + // clang-format off + compression_positional_options.add_options()( + "archives-dir", + po::value(&m_archives_dir)->value_name("DIR"), + "output directory" + )( + "input-paths", + po::value>(&m_file_paths)->value_name("PATHS"), + "input paths" + ); + // clang-format on + + po::options_description compression_options("Compression options"); + std::string metadata_db_config_file_path; + std::string input_path_list_file_path; + // clang-format off + compression_options.add_options()( + "compression-level", + po::value(&m_compression_level)->value_name("LEVEL")-> + default_value(m_compression_level), + "1 (fast/low compression) to 9 (slow/high compression)." + )( + "target-encoded-size", + po::value(&m_target_encoded_size)->value_name("TARGET_ENCODED_SIZE")-> + default_value(m_target_encoded_size), + "Target size (B) for the dictionaries and encoded messages before a new " + "archive is created." + )( + "max-document-size", + po::value(&m_max_document_size)->value_name("DOC_SIZE")-> + default_value(m_max_document_size), + "Maximum allowed size (B) for a single document before compression fails." + )( + "timestamp-key", + po::value(&m_timestamp_key)->value_name("TIMESTAMP_COLUMN_KEY")-> + default_value(m_timestamp_key), + "Path (e.g. x.y) for the field containing the log event's timestamp." + )( + "db-config-file", + po::value(&metadata_db_config_file_path)->value_name("FILE")-> + default_value(metadata_db_config_file_path), + "Global metadata DB YAML config" + )( + "files-from,f", + po::value(&input_path_list_file_path) + ->value_name("FILE") + ->default_value(input_path_list_file_path), + "Compress files specified in FILE" + )( + "print-archive-stats", + po::bool_switch(&m_print_archive_stats), + "Print statistics (json) about the archive after it's compressed." + ); + // clang-format on + + po::positional_options_description positional_options; + positional_options.add("archives-dir", 1); + positional_options.add("input-paths", -1); + + po::options_description all_compression_options; + all_compression_options.add(compression_options); + all_compression_options.add(compression_positional_options); + + std::vector unrecognized_options + = po::collect_unrecognized(parsed.options, po::include_positional); + unrecognized_options.erase(unrecognized_options.begin()); + po::store( + po::command_line_parser(unrecognized_options) + .options(all_compression_options) + .positional(positional_options) + .run(), + parsed_command_line_options + ); + po::notify(parsed_command_line_options); + + if (parsed_command_line_options.count("help")) { + print_ir_compression_usage(); + + std::cerr << "Examples:\n"; + std::cerr << " # Compress file1.ir and dir1 into archives-dir\n"; + std::cerr << " " << m_program_name << " i archives-dir file1.ir dir1\n"; + + po::options_description visible_options; + visible_options.add(general_options); + visible_options.add(compression_options); + std::cerr << visible_options << '\n'; + return ParsingResult::InfoCommand; + } + + if (m_archives_dir.empty()) { + throw std::invalid_argument("No archives directory specified."); + } + + if (false == input_path_list_file_path.empty()) { + if (false == read_paths_from_file(input_path_list_file_path, m_file_paths)) { + SPDLOG_ERROR("Failed to read paths from {}", input_path_list_file_path); + return ParsingResult::Failure; + } + } + + if (m_file_paths.empty()) { + throw std::invalid_argument("No input paths specified."); + } + + // Parse and validate global metadata DB config + if (false == metadata_db_config_file_path.empty()) { + clp::GlobalMetadataDBConfig metadata_db_config; + try { + metadata_db_config.parse_config_file(metadata_db_config_file_path); + } catch (std::exception& e) { + SPDLOG_ERROR("Failed to validate metadata database config - {}.", e.what()); + return ParsingResult::Failure; + } + + if (clp::GlobalMetadataDBConfig::MetadataDBType::MySQL + != metadata_db_config.get_metadata_db_type()) + { + SPDLOG_ERROR( + "Invalid metadata database type for {}; only supported type is MySQL.", + m_program_name + ); + return ParsingResult::Failure; + } + + m_metadata_db_config = std::move(metadata_db_config); + } + } else if ((char)Command::JsonToIr == command_input) { + po::options_description compression_positional_options; + // clang-format off + compression_positional_options.add_options()( + "ir-dir", + po::value(&m_archives_dir)->value_name("DIR"), + "output directory" + )( + "input-paths", + po::value>(&m_file_paths)->value_name("PATHS"), + "input paths" + ); + // clang-format on + + po::options_description compression_options("Compression options"); + std::string metadata_db_config_file_path; + std::string input_path_list_file_path; + // clang-format off + compression_options.add_options()( + "compression-level", + po::value(&m_compression_level)->value_name("LEVEL")-> + default_value(m_compression_level), + "1 (fast/low compression) to 9 (slow/high compression)." + )( + "max-document-size", + po::value(&m_max_document_size)->value_name("DOC_SIZE")-> + default_value(m_max_document_size), + "Maximum allowed size (B) for a single document before ir generation fails." + )( + "max-ir-buffer-size", + po::value(&m_max_ir_buffer_size)->value_name("BUFFER_SIZE")-> + default_value(m_max_ir_buffer_size), + "Maximum allowed size (B) for an in memory IR buffer befroe being written to file." + )( + "encoding-type", + po::value(&m_encoding_type)->value_name("ENCODING_TYPE")-> + default_value(m_encoding_type), + "4 (four byte encoding) or 8 (eight byte encoding)" + )( + "db-config-file", + po::value(&metadata_db_config_file_path)->value_name("FILE")-> + default_value(metadata_db_config_file_path), + "Global metadata DB YAML config" + )( + "files-from,f", + po::value(&input_path_list_file_path) + ->value_name("FILE") + ->default_value(input_path_list_file_path), + "Compress files specified in FILE" + ); + // clang-format on + + po::positional_options_description positional_options; + positional_options.add("ir-dir", 1); + positional_options.add("input-paths", -1); + + po::options_description all_compression_options; + all_compression_options.add(compression_options); + all_compression_options.add(compression_positional_options); + + std::vector unrecognized_options + = po::collect_unrecognized(parsed.options, po::include_positional); + unrecognized_options.erase(unrecognized_options.begin()); + po::store( + po::command_line_parser(unrecognized_options) + .options(all_compression_options) + .positional(positional_options) + .run(), + parsed_command_line_options + ); + po::notify(parsed_command_line_options); + + if (parsed_command_line_options.count("help")) { + print_json_to_ir_usage(); + + std::cerr << "Examples:\n"; + std::cerr << " # Parse file1.json and dir1 into irs-dir\n"; + std::cerr << " " << m_program_name << " r irs-dir file1.json dir1\n"; + + po::options_description visible_options; + visible_options.add(general_options); + visible_options.add(compression_options); + std::cerr << visible_options << '\n'; + return ParsingResult::InfoCommand; + } + + if (m_archives_dir.empty()) { + throw std::invalid_argument("No IRs directory specified."); + } + + if (false == input_path_list_file_path.empty()) { + if (false == read_paths_from_file(input_path_list_file_path, m_file_paths)) { + SPDLOG_ERROR("Failed to read paths from {}", input_path_list_file_path); + return ParsingResult::Failure; + } + } + + if (m_file_paths.empty()) { + throw std::invalid_argument("No input paths specified."); + } + // Parse and validate global metadata DB config if (false == metadata_db_config_file_path.empty()) { clp::GlobalMetadataDBConfig metadata_db_config; @@ -786,4 +1043,12 @@ void CommandLineArguments::print_search_usage() const { " [OUTPUT_HANDLER [OUTPUT_HANDLER_OPTIONS]]" << std::endl; } + +void CommandLineArguments::print_json_to_ir_usage() const { + std::cerr << "Usage: " << m_program_name << " r [OPTIONS] IRS_DIR [FILE/DIR ...]\n"; +} + +void CommandLineArguments::print_ir_compression_usage() const { + std::cerr << "Usage: " << m_program_name << " i [OPTIONS] ARCHIVES_DIR [FILE/DIR ...]\n"; +} } // namespace clp_s diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 0f3d8c556..91e0eecb1 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -26,7 +26,9 @@ class CommandLineArguments { enum class Command : char { Compress = 'c', Extract = 'x', - Search = 's' + Search = 's', + JsonToIr = 'r', + IrCompress = 'i' }; enum class OutputHandlerType : uint8_t { @@ -60,6 +62,10 @@ class CommandLineArguments { size_t get_max_document_size() const { return m_max_document_size; } + [[nodiscard]] auto get_max_ir_buffer_size() const -> size_t { return m_max_ir_buffer_size; } + + [[nodiscard]] auto get_encoding_type() const -> int { return m_encoding_type; } + [[nodiscard]] bool print_archive_stats() const { return m_print_archive_stats; } std::string const& get_mongodb_uri() const { return m_mongodb_uri; } @@ -157,6 +163,10 @@ class CommandLineArguments { void print_search_usage() const; + void print_json_to_ir_usage() const; + + void print_ir_compression_usage() const; + // Variables std::string m_program_name; Command m_command; @@ -173,7 +183,8 @@ class CommandLineArguments { bool m_structurize_arrays{false}; bool m_ordered_decompression{false}; size_t m_ordered_chunk_size{0}; - + int m_encoding_type{8}; + size_t m_max_ir_buffer_size{512ULL * 1024 * 1024}; // Metadata db variables std::optional m_metadata_db_config; diff --git a/components/core/src/clp_s/JsonParser.cpp b/components/core/src/clp_s/JsonParser.cpp index a68062958..eef54019d 100644 --- a/components/core/src/clp_s/JsonParser.cpp +++ b/components/core/src/clp_s/JsonParser.cpp @@ -1,15 +1,29 @@ #include "JsonParser.hpp" -#include +#include +#include +#include #include +#include #include #include -#include "archive_constants.hpp" +#include "../clp/ffi/SchemaTree.hpp" +#include "../clp/ffi/SchemaTreeNode.hpp" +#include "../clp/ffi/utils.hpp" +#include "../clp/ffi/Value.hpp" +#include "../clp/ir/types.hpp" +#include "../clp/streaming_compression/zstd/Decompressor.hpp" +#include "DictionaryWriter.hpp" #include "JsonFileIterator.hpp" +#include "ParsedMessage.hpp" +#include "SchemaTree.hpp" + +using namespace simdjson; namespace clp_s { + JsonParser::JsonParser(JsonParserOption const& option) : m_num_messages(0), m_target_encoded_size(option.target_encoded_size), @@ -520,6 +534,265 @@ bool JsonParser::parse() { return true; } +auto JsonParser::get_archive_node_type( + clp::ffi::SchemaTreeNode::Type ir_node_type, + bool node_has_value, + std::optional const& node_value +) -> NodeType { + // figure out what type the node is in archive node type + NodeType archive_node_type = NodeType::Unknown; + switch (ir_node_type) { + case clp::ffi::SchemaTreeNode::Type::Int: + archive_node_type = NodeType::Integer; + break; + case clp::ffi::SchemaTreeNode::Type::Float: + archive_node_type = NodeType::Float; + break; + case clp::ffi::SchemaTreeNode::Type::Bool: + archive_node_type = NodeType::Boolean; + break; + case clp::ffi::SchemaTreeNode::Type::UnstructuredArray: + archive_node_type = NodeType::UnstructuredArray; + break; + case clp::ffi::SchemaTreeNode::Type::Str: + if (node_value && node_value->is()) { + archive_node_type = NodeType::VarString; + } else { + archive_node_type = NodeType::ClpString; + } + break; + case clp::ffi::SchemaTreeNode::Type::Obj: + if (node_has_value) { + if (node_value->is_null()) { + archive_node_type = NodeType::NullValue; + } else { + archive_node_type = NodeType::Object; + } + } else { + archive_node_type = NodeType::Object; + } + break; + default: + break; + } + return archive_node_type; +} + +auto JsonParser::get_archive_node_id( + std::unordered_map>>& + ir_node_to_archive_node_unordered_map, + int32_t ir_node_id, + NodeType archive_node_type, + clp::ffi::SchemaTree const& ir_tree +) -> int { + auto unordered_map_location = ir_node_to_archive_node_unordered_map.find(ir_node_id); + if (ir_node_to_archive_node_unordered_map.end() != unordered_map_location) { + auto translation_vector = unordered_map_location->second; + for (int i = 0; i < translation_vector.size(); i++) { + if (translation_vector[i].first == archive_node_type) { + return translation_vector[i].second; + } + } + } + + auto const& curr_node = ir_tree.get_node(ir_node_id); + int32_t parent_node_id{-1}; + if (ir_node_id != curr_node.get_parent_id()) { + parent_node_id = get_archive_node_id( + ir_node_to_archive_node_unordered_map, + curr_node.get_parent_id(), + NodeType::Object, + ir_tree + ); + } + auto validated_escaped_key + = clp::ffi::validate_and_escape_utf8_string(curr_node.get_key_name()); + std::string node_key = ""; + if (validated_escaped_key.has_value()) { + node_key = validated_escaped_key.value(); + } else { + throw "Key is not UTF-8 compliant"; + } + int curr_node_archive_id + = m_archive_writer->add_node(parent_node_id, archive_node_type, node_key); + auto p = std::make_pair(archive_node_type, curr_node_archive_id); + if (ir_node_to_archive_node_unordered_map.end() != unordered_map_location) { + unordered_map_location->second.push_back(p); + } else { + std::vector> v; + v.push_back(p); + ir_node_to_archive_node_unordered_map.emplace(ir_node_id, v); + } + return curr_node_archive_id; +} + +void JsonParser::parse_kv_log_event( + KeyValuePairLogEvent const& kv, + std::unordered_map>>& + ir_node_to_archive_node_unordered_map +) { + clp::ffi::SchemaTree const& tree = kv.get_schema_tree(); + for (auto const& pair : kv.get_node_id_value_pairs()) { + clp::ffi::SchemaTreeNode const& tree_node = tree.get_node(pair.first); + clp::ffi::SchemaTreeNode::Type ir_node_type = tree_node.get_type(); + bool node_has_value = pair.second.has_value(); + NodeType archive_node_type = NodeType::Unknown; + if (node_has_value) { + archive_node_type + = get_archive_node_type(ir_node_type, node_has_value, pair.second.value()); + } else { + archive_node_type = get_archive_node_type(ir_node_type, node_has_value, {}); + } + int node_id; + try { + node_id = get_archive_node_id( + ir_node_to_archive_node_unordered_map, + pair.first, + archive_node_type, + tree + ); + } catch (...) { + throw; + } + + switch (archive_node_type) { + case NodeType::Integer: { + int64_t i64_value = pair.second.value().get_immutable_view(); + m_current_parsed_message.add_value(node_id, i64_value); + } break; + case NodeType::Float: { + double d_value = pair.second.value().get_immutable_view(); + m_current_parsed_message.add_value(node_id, d_value); + } break; + case NodeType::Boolean: { + bool b_value = pair.second.value().get_immutable_view(); + m_current_parsed_message.add_value(node_id, b_value); + } break; + case NodeType::VarString: { + auto validated_escaped_string = clp::ffi::validate_and_escape_utf8_string( + pair.second.value().get_immutable_view() + ); + std::string str = ""; + if (validated_escaped_string.has_value()) { + str = validated_escaped_string.value(); + } else { + throw "String is not utf8 compliant"; + } + m_current_parsed_message.add_value(node_id, str); + } break; + case NodeType::ClpString: { + std::string encoded_str = ""; + std::string decodedValue = ""; + if (pair.second.value().is()) { + decodedValue = pair.second.value() + .get_immutable_view() + .decode_and_unparse() + .value(); + + } else { + decodedValue = pair.second.value() + .get_immutable_view() + .decode_and_unparse() + .value(); + } + auto validated_escaped_encoded_string + = clp::ffi::validate_and_escape_utf8_string(decodedValue.c_str()); + if (validated_escaped_encoded_string.has_value()) { + encoded_str = validated_escaped_encoded_string.value(); + } else { + throw "Encoded string is not utf8 compliant"; + } + m_current_parsed_message.add_value(node_id, encoded_str); + } break; + case NodeType::UnstructuredArray: { + std::string array_str; + if (pair.second.value().is()) { + array_str = pair.second.value() + .get_immutable_view() + .decode_and_unparse() + .value(); + } else { + array_str = pair.second.value() + .get_immutable_view() + .decode_and_unparse() + .value(); + } + m_current_parsed_message.add_value(node_id, array_str); + break; + } + default: + // Don't need to add value for obj or null + break; + } + m_current_schema.insert_ordered(node_id); + } + + int32_t current_schema_id = m_archive_writer->add_schema(m_current_schema); + m_current_parsed_message.set_id(current_schema_id); + m_archive_writer->append_message(current_schema_id, m_current_schema, m_current_parsed_message); +} + +auto JsonParser::parse_from_ir() -> bool { + std::unordered_map>> + ir_node_to_archive_node_unordered_map; + + for (auto& file_path : m_file_paths) { + int fsize = std::filesystem::file_size(file_path); + if (0 == fsize) { + m_archive_writer->close(); + return false; + } + clp::streaming_compression::zstd::Decompressor zd; + zd.open(file_path); + + auto deserializer_result = Deserializer::create(zd); + if (deserializer_result.has_error()) { + zd.close(); + m_archive_writer->close(); + return false; + } + auto& deserializer = deserializer_result.value(); + + m_num_messages = 0; + do { + auto const kv_log_event_result = deserializer.deserialize_to_next_log_event(zd); + + if (kv_log_event_result.has_error()) { + if (kv_log_event_result.error() == std::errc::no_message_available + || kv_log_event_result.error() == std::errc::result_out_of_range) + { + break; + } + } + + m_current_schema.clear(); + auto const& kv_log_event = kv_log_event_result.value(); + try { + parse_kv_log_event(kv_log_event, ir_node_to_archive_node_unordered_map); + } catch (std::string msg) { + SPDLOG_ERROR("ERROR: {}" + msg); + zd.close(); + return false; + } catch (...) { + SPDLOG_ERROR("ERROR: Encountered error while parsing a kv log event"); + zd.close(); + return false; + } + m_num_messages++; + if (m_archive_writer->get_data_size() >= m_target_encoded_size) { + ir_node_to_archive_node_unordered_map.clear(); + split_archive(); + } + + m_current_parsed_message.clear(); + + } while (true); + ir_node_to_archive_node_unordered_map.clear(); + zd.close(); + } + return true; +} + void JsonParser::store() { m_archive_writer->close(); } diff --git a/components/core/src/clp_s/JsonParser.hpp b/components/core/src/clp_s/JsonParser.hpp index 84aa27fef..956373ae2 100644 --- a/components/core/src/clp_s/JsonParser.hpp +++ b/components/core/src/clp_s/JsonParser.hpp @@ -1,29 +1,32 @@ #ifndef CLP_S_JSONPARSER_HPP #define CLP_S_JSONPARSER_HPP -#include +#include +#include #include +#include #include #include #include -#include +#include "../clp/BufferReader.hpp" +#include "../clp/ffi/ir_stream/Deserializer.hpp" +#include "../clp/ffi/KeyValuePairLogEvent.hpp" +#include "../clp/ffi/SchemaTree.hpp" +#include "../clp/ffi/SchemaTreeNode.hpp" +#include "../clp/ffi/Value.hpp" #include "../clp/GlobalMySQLMetadataDB.hpp" +#include "../clp/type_utils.hpp" #include "ArchiveWriter.hpp" -#include "DictionaryWriter.hpp" -#include "FileReader.hpp" -#include "FileWriter.hpp" #include "ParsedMessage.hpp" #include "Schema.hpp" -#include "SchemaMap.hpp" #include "SchemaTree.hpp" -#include "SchemaWriter.hpp" -#include "TimestampDictionaryWriter.hpp" -#include "Utils.hpp" -#include "ZstdCompressor.hpp" -using namespace simdjson; +using clp::BufferReader; +using clp::ffi::ir_stream::Deserializer; +using clp::ffi::KeyValuePairLogEvent; +using clp::size_checked_pointer_cast; namespace clp_s { struct JsonParserOption { @@ -38,6 +41,15 @@ struct JsonParserOption { std::shared_ptr metadata_db; }; +struct JsonToIrParserOption { + std::vector file_paths; + std::string irs_dir; + size_t max_document_size; + size_t max_ir_buffer_size; + int compression_level; + int encoding; +}; + class JsonParser { public: class OperationFailed : public TraceableException { @@ -50,6 +62,8 @@ class JsonParser { // Constructor explicit JsonParser(JsonParserOption const& option); + JsonParser(JsonToIrParserOption const& option); + // Destructor ~JsonParser() = default; @@ -59,6 +73,12 @@ class JsonParser { */ [[nodiscard]] bool parse(); + /** + * Parses the Key Value IR Stream and stores the data in the archive. + * @return whether the IR Stream was parsed successfully + */ + [[nodiscard]] auto parse_from_ir() -> bool; + /** * Writes the metadata and archive data to disk. */ @@ -74,6 +94,47 @@ class JsonParser { */ void parse_line(ondemand::value line, int32_t parent_node_id, std::string const& key); + /** + * Determines the archive node type based on the IR node type and value. + * @param ir_node_type schema node type from the IR stream + * @param node_has_value Boolean that says whether or not the node has value. + * @param node_value The IR schema node value if the node has value + * @return The clp-s archive Node Type that should be used for the archive node + */ + static auto get_archive_node_type( + clp::ffi::SchemaTreeNode::Type ir_node_type, + bool node_has_value, + std::optional const& node_value + ) -> NodeType; + + /** + * Get archive node id for ir node + * @param ir_node_to_archive_node_unordered_map cache of node id conversions between + * deserializer schema tree nodes and archive schema tree nodes + * @param ir_node_id ID of the IR node + * @param archive_node_type Type of the archive node + * @param ir_tree The IR schema tree + */ + auto get_archive_node_id( + std::unordered_map>>& + ir_node_to_archive_node_unordered_map, + int32_t ir_node_id, + NodeType archive_node_type, + clp::ffi::SchemaTree const& ir_tree + ) -> int; + + /** + * Parses a Key Value Log Event + * @param kv the key value log event + * @param ir_node_to_archive_node_unordered_map cache of node id conversions between + * deserializer schema tree nodes and archive schema tree nodes + */ + void parse_kv_log_event( + KeyValuePairLogEvent const& kv, + std::unordered_map>>& + ir_node_to_archive_node_unordered_map + ); + /** * Parses an array within a JSON line * @param line the JSON array diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 0e0401ad1..554422adf 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -1,6 +1,7 @@ +#include #include #include -#include +#include #include #include #include @@ -8,14 +9,17 @@ #include #include +#include #include #include +#include "../clp/ffi/ir_stream/Serializer.hpp" #include "../clp/GlobalMySQLMetadataDB.hpp" #include "../clp/streaming_archive/ArchiveMetadata.hpp" #include "../reducer/network_utils.hpp" #include "CommandLineArguments.hpp" #include "Defs.hpp" +#include "FileWriter.hpp" #include "JsonConstructor.hpp" #include "JsonParser.hpp" #include "ReaderUtils.hpp" @@ -33,14 +37,17 @@ #include "TimestampPattern.hpp" #include "TraceableException.hpp" #include "Utils.hpp" +#include "ZstdCompressor.hpp" using namespace clp_s::search; +using clp::ffi::ir_stream::Serializer; using clp_s::cArchiveFormatDevelopmentVersionFlag; using clp_s::cEpochTimeMax; using clp_s::cEpochTimeMin; using clp_s::CommandLineArguments; namespace { + /** * Compresses the input files specified by the command line arguments into an archive. * @param command_line_arguments @@ -48,6 +55,54 @@ namespace { */ bool compress(CommandLineArguments const& command_line_arguments); +template +auto flush_and_clear_serializer_buffer( + Serializer& serializer, + std::vector& byte_buf +) -> void; + +template +auto unpack_and_serialize_msgpack_bytes( + std::vector const& msgpack_bytes, + Serializer& serializer +) -> bool; + +/** + * Given user specified options and a file path to a JSON file calls the serailizer one each JSON + * entry to serialize into IR + * @param option + * @param path + * @return Whether serialization was successful + */ +template +auto run_serializer(clp_s::JsonToIrParserOption const& option, std::string path); + +/** + * Iterates over the input JSON files specified by the command line arguments to generate and IR + * file for each one. + * @param command_line_arguments + * @return Whether generation was successful + */ +auto generate_ir(CommandLineArguments const& command_line_arguments) -> bool; + +/** + * Fill in JsonParserOption instance based on command line user input + * @param command_line_arguments + * @param option + * @return Whether setup was succesful + */ +auto setup_compression_options( + CommandLineArguments const& command_line_arguments, + clp_s::JsonParserOption& option +) -> bool; + +/** + * Compresses the input IR files specified by the command line arguments into an archive. + * @param command_line_arguments + * @return Whether compression was successful + */ +auto ir_compress(CommandLineArguments const& command_line_arguments) -> bool; + /** * Decompresses the archive specified by the given JsonConstructorOption. * @param json_constructor_option @@ -116,6 +171,215 @@ bool compress(CommandLineArguments const& command_line_arguments) { return true; } +template +auto flush_and_clear_serializer_buffer( + Serializer& serializer, + std::vector& byte_buf +) -> void { + auto const view{serializer.get_ir_buf_view()}; + byte_buf.insert(byte_buf.cend(), view.begin(), view.end()); + serializer.clear_ir_buf(); +} + +template +auto unpack_and_serialize_msgpack_bytes( + std::vector const& msgpack_bytes, + Serializer& serializer +) -> bool { + try { + auto const msgpack_obj_handle{msgpack::unpack( + clp::size_checked_pointer_cast(msgpack_bytes.data()), + msgpack_bytes.size() + )}; + auto const msgpack_obj{msgpack_obj_handle.get()}; + if (msgpack::type::MAP != msgpack_obj.type) { + return false; + } + return serializer.serialize_msgpack_map(msgpack_obj.via.map); + } catch (std::exception const& e) { + SPDLOG_ERROR("Failed to unpack msgpack bytes: {}", e.what()); + return false; + } +} + +template +auto run_serializer(clp_s::JsonToIrParserOption const& option, std::string path) { + auto result{Serializer::create()}; + if (result.has_error()) { + SPDLOG_ERROR("Failed to create Serializer"); + return false; + } + auto& serializer{result.value()}; + std::vector ir_buf; + flush_and_clear_serializer_buffer(serializer, ir_buf); + + std::ifstream in_file; + in_file.open(path, std::ifstream::in); + + std::filesystem::path input_path{path}; + std::string filename = input_path.filename().string(); + std::string out_path = option.irs_dir + "/" + filename + ".ir"; + + clp_s::FileWriter out_file; + out_file.open(out_path, clp_s::FileWriter::OpenMode::CreateForWriting); + clp_s::ZstdCompressor zc; + try { + zc.open(out_file, option.compression_level); + } catch (clp_s::ZstdCompressor::OperationFailed& error) { + SPDLOG_ERROR("Failed to open ZSTDcompressor - {}", error.what()); + in_file.close(); + out_file.close(); + return false; + } + + std::string line = ""; + size_t total_size = 0; + + if (in_file.is_open()) { + while (getline(in_file, line)) { + try { + auto j_obj = nlohmann::json::parse(line); + if (false + == unpack_and_serialize_msgpack_bytes( + nlohmann::json::to_msgpack(j_obj), + serializer + )) + { + SPDLOG_ERROR("Failed to serialize msgpack bytes for line: {}", line); + in_file.close(); + out_file.close(); + zc.close(); + return false; + } + flush_and_clear_serializer_buffer(serializer, ir_buf); + if (ir_buf.size() >= option.max_ir_buffer_size) { + total_size = total_size + ir_buf.size(); + zc.write(reinterpret_cast(ir_buf.data()), ir_buf.size()); + zc.flush(); + ir_buf.clear(); + } + } catch (nlohmann::json::parse_error const& e) { + SPDLOG_ERROR("JSON parsing error: {}", e.what()); + in_file.close(); + out_file.close(); + zc.close(); + return false; + } catch (std::exception const& e) { + SPDLOG_ERROR("Error during serialization: {}", e.what()); + in_file.close(); + out_file.close(); + zc.close(); + return false; + } + } + total_size = total_size + ir_buf.size(); + zc.write(reinterpret_cast(ir_buf.data()), ir_buf.size()); + zc.flush(); + ir_buf.clear(); + in_file.close(); + zc.close(); + out_file.close(); + } + + return true; +} + +auto generate_ir(CommandLineArguments const& command_line_arguments) -> bool { + auto irs_dir = std::filesystem::path(command_line_arguments.get_archives_dir()); + + // Create output directory in case it doesn't exist + try { + std::filesystem::create_directory(irs_dir.string()); + } catch (std::exception& e) { + SPDLOG_ERROR("Failed to create archives directory {} - {}", irs_dir.string(), e.what()); + return false; + } + clp_s::JsonToIrParserOption option{}; + option.file_paths = command_line_arguments.get_file_paths(); + option.irs_dir = irs_dir.string(); + option.max_document_size = command_line_arguments.get_max_document_size(); + option.max_ir_buffer_size = command_line_arguments.get_max_ir_buffer_size(); + option.compression_level = command_line_arguments.get_compression_level(); + option.encoding = command_line_arguments.get_encoding_type(); + + if (false == clp_s::FileUtils::validate_path(option.file_paths)) { + SPDLOG_ERROR("Invalid file path(s) provided"); + return false; + } + + std::vector all_file_paths; + for (auto& file_path : option.file_paths) { + clp_s::FileUtils::find_all_files(file_path, all_file_paths); + } + + for (auto& path : all_file_paths) { + bool success; + if (option.encoding == 4) { + success = run_serializer(option, path); + } else { + success = run_serializer(option, path); + } + if (false == success) { + return false; + } + } + return true; +} + +auto setup_compression_options( + CommandLineArguments const& command_line_arguments, + clp_s::JsonParserOption& option +) -> bool { + auto archives_dir = std::filesystem::path(command_line_arguments.get_archives_dir()); + // Create output directory in case it doesn't exist + try { + std::filesystem::create_directory(archives_dir.string()); + } catch (std::exception& e) { + SPDLOG_ERROR( + "Failed to create archives directory {} - {}", + archives_dir.string(), + e.what() + ); + return false; + } + option.file_paths = command_line_arguments.get_file_paths(); + option.archives_dir = archives_dir.string(); + option.target_encoded_size = command_line_arguments.get_target_encoded_size(); + option.max_document_size = command_line_arguments.get_max_document_size(); + option.compression_level = command_line_arguments.get_compression_level(); + option.timestamp_key = command_line_arguments.get_timestamp_key(); + option.print_archive_stats = command_line_arguments.print_archive_stats(); + + auto const& db_config_container = command_line_arguments.get_metadata_db_config(); + if (db_config_container.has_value()) { + auto const& db_config = db_config_container.value(); + option.metadata_db = std::make_shared( + db_config.get_metadata_db_host(), + db_config.get_metadata_db_port(), + db_config.get_metadata_db_username(), + db_config.get_metadata_db_password(), + db_config.get_metadata_db_name(), + db_config.get_metadata_table_prefix() + ); + } + return true; +} + +auto ir_compress(CommandLineArguments const& command_line_arguments) -> bool { + clp_s::JsonParserOption option{}; + if (false == setup_compression_options(command_line_arguments, option)) { + return false; + } + + clp_s::JsonParser parser(option); + if (false == parser.parse_from_ir()) { + SPDLOG_ERROR("Encountered error while parsing input"); + return false; + } + parser.store(); + return true; +} + void decompress_archive(clp_s::JsonConstructorOption const& json_constructor_option) { clp_s::JsonConstructor constructor(json_constructor_option); constructor.store(); @@ -263,6 +527,14 @@ int main(int argc, char const* argv[]) { if (false == compress(command_line_arguments)) { return 1; } + } else if (CommandLineArguments::Command::IrCompress == command_line_arguments.get_command()) { + if (false == ir_compress(command_line_arguments)) { + return 1; + } + } else if (CommandLineArguments::Command::JsonToIr == command_line_arguments.get_command()) { + if (false == generate_ir(command_line_arguments)) { + return 1; + } } else if (CommandLineArguments::Command::Extract == command_line_arguments.get_command()) { auto const& archives_dir = command_line_arguments.get_archives_dir(); if (false == std::filesystem::is_directory(archives_dir)) {