diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index 1656a5d59..9ca0c947e 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -8,11 +8,35 @@ set( ../clp/database_utils.hpp ../clp/Defs.h ../clp/ErrorCode.hpp + ../clp/ffi/ir_stream/decoding_methods.cpp + ../clp/ffi/ir_stream/decoding_methods.hpp + ../clp/ffi/ir_stream/Deserializer.hpp + ../clp/ffi/ir_stream/encoding_methods.cpp + ../clp/ffi/ir_stream/encoding_methods.hpp + ../clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp + ../clp/ffi/ir_stream/ir_unit_deserialization_methods.hpp + ../clp/ffi/ir_stream/Serializer.cpp + ../clp/ffi/ir_stream/Serializer.hpp + ../clp/ffi/ir_stream/utils.cpp + ../clp/ffi/ir_stream/utils.hpp + ../clp/ffi/KeyValuePairLogEvent.cpp + ../clp/ffi/KeyValuePairLogEvent.hpp + ../clp/ffi/SchemaTree.cpp + ../clp/ffi/SchemaTree.hpp + ../clp/ffi/utils.cpp + ../clp/ffi/utils.hpp + ../clp/ffi/Value.hpp + ../clp/FileDescriptor.cpp + ../clp/FileDescriptor.hpp ../clp/GlobalMetadataDB.hpp ../clp/GlobalMetadataDBConfig.cpp ../clp/GlobalMetadataDBConfig.hpp ../clp/GlobalMySQLMetadataDB.cpp ../clp/GlobalMySQLMetadataDB.hpp + ../clp/ir/EncodedTextAst.cpp + ../clp/ir/EncodedTextAst.hpp + ../clp/ir/parsing.cpp + ../clp/ir/parsing.hpp ../clp/MySQLDB.cpp ../clp/MySQLDB.hpp ../clp/MySQLParamBindings.cpp @@ -23,9 +47,16 @@ set( ../clp/networking/socket_utils.hpp ../clp/ReaderInterface.cpp ../clp/ReaderInterface.hpp + ../clp/ReadOnlyMemoryMappedFile.cpp + ../clp/ReadOnlyMemoryMappedFile.hpp ../clp/streaming_archive/ArchiveMetadata.cpp ../clp/streaming_archive/ArchiveMetadata.hpp + ../clp/streaming_compression/zstd/Decompressor.cpp + ../clp/streaming_compression/zstd/Decompressor.hpp ../clp/TraceableException.hpp + ../clp/time_types.hpp + ../clp/utf8_utils.cpp + ../clp/utf8_utils.hpp ../clp/WriterInterface.cpp ../clp/WriterInterface.hpp ) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index c7fb9487e..0700f722d 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -274,6 +274,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); return ParsingResult::Failure; } + if (false == m_timestamp_key.empty()) { + SPDLOG_ERROR( + "Invalid combination of arguments; --file-type {} and " + "--timestamp-key can't be used together", + cKeyValueIrFileType + ); + return ParsingResult::Failure; + } } else { throw std::invalid_argument("Unknown FILE_TYPE: " + file_type); } diff --git a/components/core/src/clp_s/JsonParser.cpp b/components/core/src/clp_s/JsonParser.cpp index d14a221b3..c917b1f09 100644 --- a/components/core/src/clp_s/JsonParser.cpp +++ b/components/core/src/clp_s/JsonParser.cpp @@ -1,15 +1,78 @@ #include "JsonParser.hpp" +#include #include +#include #include +#include +#include +#include #include #include +#include "../clp/ffi/ir_stream/decoding_methods.hpp" +#include "../clp/ffi/ir_stream/Deserializer.hpp" +#include "../clp/ffi/ir_stream/IrUnitType.hpp" +#include "../clp/ffi/KeyValuePairLogEvent.hpp" +#include "../clp/ffi/SchemaTree.hpp" +#include "../clp/ffi/utils.hpp" +#include "../clp/ffi/Value.hpp" +#include "../clp/ir/EncodedTextAst.hpp" +#include "../clp/streaming_compression/zstd/Decompressor.hpp" +#include "../clp/time_types.hpp" #include "archive_constants.hpp" +#include "ErrorCode.hpp" #include "JsonFileIterator.hpp" +using clp::ffi::ir_stream::Deserializer; +using clp::ffi::ir_stream::IRErrorCode; +using clp::ffi::KeyValuePairLogEvent; +using clp::UtcOffset; + namespace clp_s { +/** + * Class that implements `clp::ffi::ir_stream::IrUnitHandlerInterface` for Key-Value IR compression. + */ +class IrUnitHandler { +public: + [[nodiscard]] auto handle_log_event(KeyValuePairLogEvent&& log_event) -> IRErrorCode { + m_deserialized_log_event.emplace(std::move(log_event)); + return IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] static auto handle_utc_offset_change( + [[maybe_unused]] UtcOffset utc_offset_old, + [[maybe_unused]] UtcOffset utc_offset_new + ) -> IRErrorCode { + return IRErrorCode::IRErrorCode_Decode_Error; + } + + [[nodiscard]] auto handle_schema_tree_node_insertion( + [[maybe_unused]] clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator + ) -> IRErrorCode { + return IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] auto handle_end_of_stream() -> IRErrorCode { + m_is_complete = true; + return IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] auto get_deserialized_log_event( + ) const -> std::optional const& { + return m_deserialized_log_event; + } + + void clear() { m_is_complete = false; } + + [[nodiscard]] auto is_complete() const -> bool { return m_is_complete; } + +private: + std::optional m_deserialized_log_event; + bool m_is_complete{false}; +}; + JsonParser::JsonParser(JsonParserOption const& option) : m_num_messages(0), m_target_encoded_size(option.target_encoded_size), @@ -557,6 +620,309 @@ int32_t JsonParser::add_metadata_field(std::string_view const field_name, NodeTy return m_archive_writer->add_node(metadata_subtree_id, type, field_name); } +auto JsonParser::get_archive_node_type( + clp::ffi::SchemaTree const& tree, + std::pair> const& kv_pair +) -> NodeType { + clp::ffi::SchemaTree::Node const& tree_node = tree.get_node(kv_pair.first); + clp::ffi::SchemaTree::Node::Type const ir_node_type = tree_node.get_type(); + bool const node_has_value = kv_pair.second.has_value(); + clp::ffi::Value node_value{}; + if (node_has_value) { + node_value = kv_pair.second.value(); + } + switch (ir_node_type) { + case clp::ffi::SchemaTree::Node::Type::Int: + return NodeType::Integer; + case clp::ffi::SchemaTree::Node::Type::Float: + return NodeType::Float; + case clp::ffi::SchemaTree::Node::Type::Bool: + return NodeType::Boolean; + case clp::ffi::SchemaTree::Node::Type::UnstructuredArray: + return NodeType::UnstructuredArray; + case clp::ffi::SchemaTree::Node::Type::Str: + if (node_value.is()) { + return NodeType::VarString; + } + return NodeType::ClpString; + case clp::ffi::SchemaTree::Node::Type::Obj: + if (node_has_value && node_value.is_null()) { + return NodeType::NullValue; + } + return NodeType::Object; + default: + throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__); + } +} + +auto JsonParser::add_node_to_archive_and_translations( + uint32_t ir_node_id, + clp::ffi::SchemaTree::Node const& ir_node_to_add, + NodeType archive_node_type, + int32_t parent_node_id +) -> int { + auto validated_escaped_key + = clp::ffi::validate_and_escape_utf8_string(ir_node_to_add.get_key_name()); + std::string node_key; + if (validated_escaped_key.has_value()) { + node_key = validated_escaped_key.value(); + } else { + SPDLOG_ERROR("Key is not UTF-8 compliant: \"{}\"", ir_node_to_add.get_key_name()); + throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__); + } + int const curr_node_archive_id + = m_archive_writer->add_node(parent_node_id, archive_node_type, node_key); + + m_ir_node_to_archive_node_id_mapping.emplace( + std::make_pair(ir_node_id, archive_node_type), + curr_node_archive_id + ); + return curr_node_archive_id; +} + +auto JsonParser::get_archive_node_id( + uint32_t ir_node_id, + NodeType archive_node_type, + clp::ffi::SchemaTree const& ir_tree +) -> int { + int curr_node_archive_id{constants::cRootNodeId}; + auto flat_map_location + = m_ir_node_to_archive_node_id_mapping.find(std::pair{ir_node_id, archive_node_type}); + + if (m_ir_node_to_archive_node_id_mapping.end() != flat_map_location) { + return flat_map_location->second; + } + + std::vector ir_id_stack; + ir_id_stack.push_back(ir_node_id); + int32_t next_parent_archive_id{constants::cRootNodeId}; + NodeType next_node_type = archive_node_type; + + while (true) { + auto const& curr_node = ir_tree.get_node(ir_id_stack.back()); + auto parent_of_curr_node_id = curr_node.get_parent_id(); + if (parent_of_curr_node_id.has_value()) { + ir_id_stack.push_back(parent_of_curr_node_id.value()); + next_node_type = NodeType::Object; + } else { + next_parent_archive_id = constants::cRootNodeId; + break; + } + + flat_map_location = m_ir_node_to_archive_node_id_mapping.find( + std::pair{ir_id_stack.back(), next_node_type} + ); + if (m_ir_node_to_archive_node_id_mapping.end() != flat_map_location) { + curr_node_archive_id = flat_map_location->second; + next_parent_archive_id = flat_map_location->second; + ir_id_stack.pop_back(); + break; + } + } + + while (false == ir_id_stack.empty()) { + auto const& curr_node = ir_tree.get_node(ir_id_stack.back()); + if (1 == ir_id_stack.size()) { + curr_node_archive_id = add_node_to_archive_and_translations( + ir_id_stack.back(), + curr_node, + archive_node_type, + next_parent_archive_id + ); + } else { + curr_node_archive_id = add_node_to_archive_and_translations( + ir_id_stack.back(), + curr_node, + NodeType::Object, + next_parent_archive_id + ); + } + next_parent_archive_id = curr_node_archive_id; + ir_id_stack.pop_back(); + } + return curr_node_archive_id; +} + +void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) { + clp::ffi::SchemaTree const& tree = kv.get_user_gen_keys_schema_tree(); + for (auto const& pair : kv.get_user_gen_node_id_value_pairs()) { + NodeType const archive_node_type = get_archive_node_type(tree, pair); + auto const node_id = get_archive_node_id(pair.first, archive_node_type, tree); + + switch (archive_node_type) { + case NodeType::Integer: { + auto const i64_value + = pair.second.value().get_immutable_view(); + m_current_parsed_message.add_value(node_id, i64_value); + } break; + case NodeType::Float: { + auto const d_value + = pair.second.value().get_immutable_view(); + m_current_parsed_message.add_value(node_id, d_value); + } break; + case NodeType::Boolean: { + auto const b_value + = pair.second.value().get_immutable_view(); + m_current_parsed_message.add_value(node_id, b_value); + } break; + case NodeType::VarString: { + auto const validated_escaped_string = clp::ffi::validate_and_escape_utf8_string( + pair.second.value().get_immutable_view() + ); + std::string str; + if (validated_escaped_string.has_value()) { + str = validated_escaped_string.value(); + } else { + SPDLOG_ERROR( + "String is not utf8 compliant: \"{}\"", + pair.second.value().get_immutable_view() + ); + throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__); + } + m_current_parsed_message.add_value(node_id, str); + } break; + case NodeType::ClpString: { + std::string encoded_str; + std::string decoded_value; + if (pair.second.value().is()) { + decoded_value = pair.second.value() + .get_immutable_view() + .decode_and_unparse() + .value(); + + } else { + decoded_value = pair.second.value() + .get_immutable_view() + .decode_and_unparse() + .value(); + } + auto const validated_escaped_encoded_string + = clp::ffi::validate_and_escape_utf8_string(decoded_value.c_str()); + if (validated_escaped_encoded_string.has_value()) { + encoded_str = validated_escaped_encoded_string.value(); + } else { + SPDLOG_ERROR("Encoded string is not utf8 compliant: \"{}\"", decoded_value); + throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__); + } + m_current_parsed_message.add_value(node_id, encoded_str); + } break; + case NodeType::UnstructuredArray: { + std::string array_str; + if (pair.second.value().is()) { + array_str = pair.second.value() + .get_immutable_view() + .decode_and_unparse() + .value(); + } else { + array_str = pair.second.value() + .get_immutable_view() + .decode_and_unparse() + .value(); + } + m_current_parsed_message.add_value(node_id, array_str); + break; + } + default: + // Don't need to add value for obj or null + break; + } + m_current_schema.insert_ordered(node_id); + } + + int32_t const current_schema_id = m_archive_writer->add_schema(m_current_schema); + m_current_parsed_message.set_id(current_schema_id); + m_archive_writer->append_message(current_schema_id, m_current_schema, m_current_parsed_message); +} + +auto JsonParser::parse_from_ir() -> bool { + for (auto& file_path : m_file_paths) { + clp::streaming_compression::zstd::Decompressor decompressor; + size_t curr_pos{}; + size_t last_pos{}; + decompressor.open(file_path); + + auto deserializer_result{Deserializer::create(decompressor, IrUnitHandler{}) + }; + if (deserializer_result.has_error()) { + decompressor.close(); + m_archive_writer->close(); + return false; + } + auto& deserializer = deserializer_result.value(); + auto& ir_unit_handler{deserializer.get_ir_unit_handler()}; + + int32_t log_event_idx_node_id{}; + auto add_log_event_idx_node = [&]() { + if (m_record_log_order) { + log_event_idx_node_id + = add_metadata_field(constants::cLogEventIdxName, NodeType::Integer); + } + }; + add_log_event_idx_node(); + while (true) { + auto const kv_log_event_result{deserializer.deserialize_next_ir_unit(decompressor)}; + + if (kv_log_event_result.has_error()) { + m_archive_writer->close(); + decompressor.close(); + return false; + } + if (kv_log_event_result.value() == clp::ffi::ir_stream::IrUnitType::EndOfStream) { + break; + } + if (kv_log_event_result.value() == clp::ffi::ir_stream::IrUnitType::LogEvent) { + auto const kv_log_event = &(ir_unit_handler.get_deserialized_log_event().value()); + + m_current_schema.clear(); + + // Add log_event_idx field to metadata for record + if (m_record_log_order) { + m_current_parsed_message.add_value( + log_event_idx_node_id, + m_archive_writer->get_next_log_event_id() + ); + m_current_schema.insert_ordered(log_event_idx_node_id); + } + + try { + parse_kv_log_event(*kv_log_event); + } catch (std::exception const& e) { + SPDLOG_ERROR("Encountered error while parsing a kv log event - {}", e.what()); + m_archive_writer->close(); + decompressor.close(); + return false; + } + + if (m_archive_writer->get_data_size() >= m_target_encoded_size) { + m_ir_node_to_archive_node_id_mapping.clear(); + decompressor.try_get_pos(curr_pos); + m_archive_writer->increment_uncompressed_size(curr_pos - last_pos); + last_pos = curr_pos; + split_archive(); + add_log_event_idx_node(); + } + + ir_unit_handler.clear(); + m_current_parsed_message.clear(); + + } else if (kv_log_event_result.value() + == clp::ffi::ir_stream::IrUnitType::SchemaTreeNodeInsertion) + { + continue; + } else { + m_archive_writer->close(); + decompressor.close(); + return false; + } + } + m_ir_node_to_archive_node_id_mapping.clear(); + decompressor.try_get_pos(curr_pos); + m_archive_writer->increment_uncompressed_size(curr_pos - last_pos); + decompressor.close(); + } + return true; +} + void JsonParser::store() { m_archive_writer->close(); } diff --git a/components/core/src/clp_s/JsonParser.hpp b/components/core/src/clp_s/JsonParser.hpp index c05ab9d60..a89c746c7 100644 --- a/components/core/src/clp_s/JsonParser.hpp +++ b/components/core/src/clp_s/JsonParser.hpp @@ -1,15 +1,22 @@ #ifndef CLP_S_JSONPARSER_HPP #define CLP_S_JSONPARSER_HPP +#include #include +#include #include #include +#include #include #include +#include #include #include +#include "../clp/ffi/KeyValuePairLogEvent.hpp" +#include "../clp/ffi/SchemaTree.hpp" +#include "../clp/ffi/Value.hpp" #include "../clp/GlobalMySQLMetadataDB.hpp" #include "ArchiveWriter.hpp" #include "CommandLineArguments.hpp" @@ -26,6 +33,7 @@ #include "ZstdCompressor.hpp" using namespace simdjson; +using clp::ffi::KeyValuePairLogEvent; namespace clp_s { struct JsonParserOption { @@ -65,6 +73,12 @@ class JsonParser { */ [[nodiscard]] bool parse(); + /** + * Parses the Key Value IR Stream and stores the data in the archive. + * @return whether the IR Stream was parsed successfully + */ + [[nodiscard]] auto parse_from_ir() -> bool; + /** * Writes the metadata and archive data to disk. */ @@ -80,6 +94,51 @@ class JsonParser { */ void parse_line(ondemand::value line, int32_t parent_node_id, std::string const& key); + /** + * Determines the archive node type based on the IR node type and value. + * @param ir_node_type schema node type from the IR stream + * @param node_has_value Boolean that says whether or not the node has value. + * @param node_value The IR schema node value if the node has value + * @return The NodeType that should be used for the archive node + */ + static auto get_archive_node_type( + clp::ffi::SchemaTree const& tree, + std::pair> const& + kv_pair + ) -> NodeType; + + /** + * Adds new schema node to archive and adds translation for IR node ID and NodeType to mapping + * @param ir_node_id ID of the IR node + * @param ir_node_to_add IR Schema Node that is being translated to archive + * @param archive_node_type Type of the archive node + * @param parent_node_id ID of the parent of the IR node + */ + auto add_node_to_archive_and_translations( + uint32_t ir_node_id, + clp::ffi::SchemaTree::Node const& ir_node_to_add, + NodeType archive_node_type, + int32_t parent_node_id + ) -> int; + + /** + * Gets the archive node ID for an IR node. + * @param ir_node_id ID of the IR node + * @param archive_node_type Type of the archive node + * @param ir_tree The IR schema tree + */ + auto get_archive_node_id( + uint32_t ir_node_id, + NodeType archive_node_type, + clp::ffi::SchemaTree const& ir_tree + ) -> int; + + /** + * Parses a Key Value Log Event. + * @param kv the Key Value Log Event + */ + void parse_kv_log_event(KeyValuePairLogEvent const& kv); + /** * Parses an array within a JSON line * @param line the JSON array @@ -123,6 +182,9 @@ class JsonParser { size_t m_max_document_size; bool m_structurize_arrays{false}; bool m_record_log_order{true}; + + absl::flat_hash_map, int32_t> + m_ir_node_to_archive_node_id_mapping; }; } // namespace clp_s diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 2c6639290..0f7b5643a 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -115,11 +115,10 @@ bool compress(CommandLineArguments const& command_line_arguments) { clp_s::JsonParser parser(option); if (CommandLineArguments::FileType::KeyValueIr == option.input_file_type) { - // Functionality Coming in later PR - // -->Call new parsing function in Json Parser to parse IRv2 to archive - // -->Check for error from parsing function - SPDLOG_ERROR("Compressing Key Value IR Files is not yet supported"); - return false; + if (false == parser.parse_from_ir()) { + SPDLOG_ERROR("Encountered error while parsing input"); + return false; + } } else { if (false == parser.parse()) { SPDLOG_ERROR("Encountered error while parsing input");