From ea7678fca67b6f560667dda9f7ac7b7e723b6ff7 Mon Sep 17 00:00:00 2001 From: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com> Date: Sun, 25 Aug 2024 16:55:40 -0400 Subject: [PATCH] ffi: Add `Deserializer` class to deserialize log events from key-value pair IR format. (#511) Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- components/core/CMakeLists.txt | 2 + .../src/clp/ffi/ir_stream/Deserializer.cpp | 734 ++++++++++++++++++ .../src/clp/ffi/ir_stream/Deserializer.hpp | 73 ++ .../clp/ffi/ir_stream/decoding_methods.cpp | 94 +-- .../clp/ffi/ir_stream/decoding_methods.hpp | 21 + .../clp/ffi/ir_stream/protocol_constants.hpp | 2 + .../core/src/clp/ffi/ir_stream/utils.hpp | 33 + .../core/tests/test-ir_encoding_methods.cpp | 61 +- 8 files changed, 973 insertions(+), 47 deletions(-) create mode 100644 components/core/src/clp/ffi/ir_stream/Deserializer.cpp create mode 100644 components/core/src/clp/ffi/ir_stream/Deserializer.hpp diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 1d93bc27b..c4f84570c 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -314,6 +314,8 @@ set(SOURCE_FILES_unitTest src/clp/ffi/encoding_methods.hpp src/clp/ffi/encoding_methods.inc src/clp/ffi/ir_stream/byteswap.hpp + src/clp/ffi/ir_stream/Deserializer.cpp + src/clp/ffi/ir_stream/Deserializer.hpp src/clp/ffi/ir_stream/decoding_methods.cpp src/clp/ffi/ir_stream/decoding_methods.hpp src/clp/ffi/ir_stream/decoding_methods.inc diff --git a/components/core/src/clp/ffi/ir_stream/Deserializer.cpp b/components/core/src/clp/ffi/ir_stream/Deserializer.cpp new file mode 100644 index 000000000..453d58bcf --- /dev/null +++ b/components/core/src/clp/ffi/ir_stream/Deserializer.cpp @@ -0,0 +1,734 @@ +#include "Deserializer.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "../../ErrorCode.hpp" +#include "../../ir/EncodedTextAst.hpp" +#include "../../ir/types.hpp" +#include "../../ReaderInterface.hpp" +#include "../../time_types.hpp" +#include "../../type_utils.hpp" +#include "../KeyValuePairLogEvent.hpp" +#include "../SchemaTree.hpp" +#include "../SchemaTreeNode.hpp" +#include "../Value.hpp" +#include "decoding_methods.hpp" +#include "protocol_constants.hpp" +#include "utils.hpp" + +namespace clp::ffi::ir_stream { +namespace { +/** + * A collection of schema tree leaf node IDs. It represents the schema of a `KeyValuePairLogEvent`. + */ +using Schema = std::vector; + +/** + * Class to perform different actions depending on whether a transaction succeeds or fails. The + * default state assumes the transaction fails. + * @tparam SuccessHandler A cleanup lambda to call on success. + * @tparam FailureHandler A cleanup lambda to call on failure. + */ +template +requires(std::is_invocable_v && std::is_invocable_v) +class TransactionManager { +public: + // Constructor + TransactionManager(SuccessHandler success_handler, FailureHandler failure_handler) + : m_success_handler{success_handler}, + m_failure_handler{failure_handler} {} + + // Delete copy/move constructor and assignment + TransactionManager(TransactionManager const&) = delete; + TransactionManager(TransactionManager&&) = delete; + auto operator=(TransactionManager const&) -> TransactionManager& = delete; + auto operator=(TransactionManager&&) -> TransactionManager& = delete; + + // Destructor + ~TransactionManager() { + if (m_success) { + m_success_handler(); + } else { + m_failure_handler(); + } + } + + // Methods + /** + * Marks the transaction as successful. + */ + auto mark_success() -> void { m_success = true; } + +private: + // Variables + SuccessHandler m_success_handler; + FailureHandler m_failure_handler; + bool m_success{false}; +}; + +/** + * @param ir_error_code + * @return Equivalent `std::errc` code indicating the same error type. + */ +[[nodiscard]] auto ir_error_code_to_errc(IRErrorCode ir_error_code) -> std::errc; + +/** + * @param tag + * @return Whether the tag represents a schema tree node. + */ +[[nodiscard]] auto is_schema_tree_node_tag(encoded_tag_t tag) -> bool; + +/** + * @param tag + * @return The corresponding schema tree node type on success. + * @return std::nullopt if the tag doesn't match to any defined schema tree node type. + */ +[[nodiscard]] auto schema_tree_node_tag_to_type(encoded_tag_t tag +) -> std::optional; + +/** + * Deserializes the parent ID of a schema tree node. + * @param reader + * @param parent_id Returns the deserialized result. + * @return IRErrorCode::IRErrorCode_Success on success. + * @return IRErrorCode::IRErrorCode_Incomplete_IR if the stream is truncated. + * @return IRErrorCode::IRErrorCode_Corrupted_IR if the next packet in the stream isn't a parent ID. + * @return Same as `deserialize_tag` on any other failure. + */ +[[nodiscard]] auto deserialize_schema_tree_node_parent_id( + ReaderInterface& reader, + SchemaTreeNode::id_t& parent_id +) -> IRErrorCode; + +/** + * Deserializes the key name of a schema tree node. + * @param reader + * @param key_name Returns the deserialized key name. + * @return IRErrorCode::IRErrorCode_Success on success. + * @return Same as `deserialize_tag` or `deserialize_string` on failure. + */ +[[nodiscard]] auto deserialize_schema_tree_node_key_name( + ReaderInterface& reader, + std::string& key_name +) -> IRErrorCode; + +/** + * Deserializes an integer value packet. + * @param reader + * @param tag + * @param val Returns the deserialized value. + * @return IRErrorCode::IRErrorCode_Success on success. + * @return IRErrorCode::IRErrorCode_Incomplete_IR if the stream is truncated. + * @return IRErrorCode::IRErrorCode_Corrupted_IR if the given tag doesn't correspond to an integer + * packet. + */ +[[nodiscard]] auto +deserialize_int_val(ReaderInterface& reader, encoded_tag_t tag, value_int_t& val) -> IRErrorCode; + +/** + * Deserializes a string packet. + * @param reader + * @param tag + * @param deserialized_str Returns the deserialized string. + * @return IRErrorCode::IRErrorCode_Success on success. + * @return IRErrorCode::IRErrorCode_Incomplete_IR if the stream is truncated. + * @return IRErrorCode::IRErrorCode_Corrupted_IR if the given tag doesn't correspond to a string + * packet. + */ +[[nodiscard]] auto deserialize_string( + ReaderInterface& reader, + encoded_tag_t tag, + std::string& deserialized_str +) -> IRErrorCode; + +/** + * Deserializes all UTC offset packets until a non-UTC offset packet tag is read. + * @param reader + * @param tag Takes the current tag as input and returns the last tag read. + * @param utc_offset Returns the deserialized UTC offset. + * @return IRErrorCode::IRErrorCode_Success on success. + * @return Same as `deserialize_utc_offset_change` or `deserialize_tag` on failure. + */ +[[nodiscard]] auto deserialize_utc_offset_changes( + ReaderInterface& reader, + encoded_tag_t& tag, + UtcOffset& utc_offset +) -> IRErrorCode; + +/** + * Deserializes all schema tree node packets and inserts them into the schema tree until a non- + * schema tree node tag is read. + * @param reader + * @param tag Takes the current tag as input and returns the last tag read. + * @param schema_tree Returns the schema tree with all new nodes inserted. + * @return IRErrorCode::IRErrorCode_Success on success. + * @return IRErrorCode::IRErrorCode_Corrupted_IR if the packet tag doesn't correspond to any known + * schema node type or the node being deserialized already exists in the current in-memory schema + * tree. + * @return Same as `deserialize_schema_tree_node_parent_id`, `deserialize_string`, or + * `deserialize_tag` on any other failure. + */ +[[nodiscard]] auto deserialize_schema_tree_nodes( + ReaderInterface& reader, + encoded_tag_t& tag, + SchemaTree& schema_tree +) -> IRErrorCode; + +/** + * Deserializes the IDs of all keys in a log event. + * @param reader + * @param tag Takes the current tag as input and returns the last tag read. + * @param schema Returns the deserialized schema. + * @return IRErrorCode::IRErrorCode_Success on success. + * @return IRErrorCode::IRErrorCode_Incomplete_IR if the stream is truncated. + * @return Same as `deserialize_tag` on any other failure. + */ +[[nodiscard]] auto +deserialize_schema(ReaderInterface& reader, encoded_tag_t& tag, Schema& schema) -> IRErrorCode; + +/** + * Deserializes the next value and pushes the result into `node_id_value_pairs`. + * @param reader + * @param tag + * @param node_id The node ID that corresponds to the value. + * @param node_id_value_pairs Returns the ID-value pair constructed from the deserialized value. + * @return IRErrorCode::IRErrorCode_Success on success. + * @return IRErrorCode::IRErrorCode_Incomplete_IR if the stream is truncated. + * @return IRErrorCode::IRErrorCode_Corrupted_IR if the tag doesn't correspond to any known value + * type. + * @return Same as `deserialize_encoded_text_ast_and_insert_to_node_id_value_pairs` on any other + * failure. + */ +[[nodiscard]] auto deserialize_value_and_insert_to_node_id_value_pairs( + ReaderInterface& reader, + encoded_tag_t tag, + SchemaTreeNode::id_t node_id, + KeyValuePairLogEvent::NodeIdValuePairs& node_id_value_pairs +) -> IRErrorCode; + +/** + * Deserializes an encoded text AST and pushes the result into node_id_value_pairs. + * @tparam encoded_variable_t + * @param reader + * @param node_id The node ID that corresponds to the value. + * @param node_id_value_pairs Returns the ID-value pair constructed by the deserialized encoded text + * AST. + * @return IRErrorCode::IRErrorCode_Success on success. + * @return Same as `deserialize_tag` or `deserialize_encoded_text_ast` on failure. + */ +template +requires(std::is_same_v + || std::is_same_v) +[[nodiscard]] auto deserialize_encoded_text_ast_and_insert_to_node_id_value_pairs( + ReaderInterface& reader, + SchemaTreeNode::id_t node_id, + KeyValuePairLogEvent::NodeIdValuePairs& node_id_value_pairs +) -> IRErrorCode; + +/** + * Deserializes values and constructs ID-value pairs according to the given schema. The number of + * values to deserialize is indicated by the size of the given schema. + * @param reader + * @param tag + * @param schema The log event's schema. + * @param node_id_value_pairs Returns the constructed ID-value pairs. + * @return IRErrorCode::IRErrorCode_Success on success. + * @return IRErrorCode::IRErrorCode_Corrupted_IR if a key is duplicated in the deserialized log + * event. + * @return Same as `deserialize_tag` or `deserialize_value_and_insert_to_node_id_value_pairs` on any + * other failure. + */ +[[nodiscard]] auto deserialize_value_and_construct_node_id_value_pairs( + ReaderInterface& reader, + encoded_tag_t tag, + Schema const& schema, + KeyValuePairLogEvent::NodeIdValuePairs& node_id_value_pairs +) -> IRErrorCode; + +auto ir_error_code_to_errc(IRErrorCode ir_error_code) -> std::errc { + switch (ir_error_code) { + case IRErrorCode_Incomplete_IR: + return std::errc::result_out_of_range; + case IRErrorCode_Corrupted_IR: + case IRErrorCode_Decode_Error: + return std::errc::protocol_error; + case IRErrorCode_Eof: + return std::errc::no_message_available; + default: + return std::errc::not_supported; + } +} + +auto is_schema_tree_node_tag(encoded_tag_t tag) -> bool { + return (tag & cProtocol::Payload::SchemaTreeNodeMask) == cProtocol::Payload::SchemaTreeNodeMask; +} + +auto schema_tree_node_tag_to_type(encoded_tag_t tag) -> std::optional { + switch (tag) { + case cProtocol::Payload::SchemaTreeNodeInt: + return SchemaTreeNode::Type::Int; + case cProtocol::Payload::SchemaTreeNodeFloat: + return SchemaTreeNode::Type::Float; + case cProtocol::Payload::SchemaTreeNodeBool: + return SchemaTreeNode::Type::Bool; + case cProtocol::Payload::SchemaTreeNodeStr: + return SchemaTreeNode::Type::Str; + case cProtocol::Payload::SchemaTreeNodeUnstructuredArray: + return SchemaTreeNode::Type::UnstructuredArray; + case cProtocol::Payload::SchemaTreeNodeObj: + return SchemaTreeNode::Type::Obj; + default: + return std::nullopt; + } +} + +auto deserialize_schema_tree_node_parent_id( + ReaderInterface& reader, + SchemaTreeNode::id_t& parent_id +) -> IRErrorCode { + encoded_tag_t tag{}; + if (auto const err{deserialize_tag(reader, tag)}; IRErrorCode::IRErrorCode_Success != err) { + return err; + } + if (cProtocol::Payload::SchemaTreeNodeParentIdUByte == tag) { + uint8_t deserialized_id{}; + if (false == deserialize_int(reader, deserialized_id)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + parent_id = static_cast(deserialized_id); + } else if (cProtocol::Payload::SchemaTreeNodeParentIdUShort == tag) { + uint16_t deserialized_id{}; + if (false == deserialize_int(reader, deserialized_id)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + parent_id = static_cast(deserialized_id); + } else { + return IRErrorCode::IRErrorCode_Corrupted_IR; + } + return IRErrorCode_Success; +} + +auto deserialize_schema_tree_node_key_name(ReaderInterface& reader, std::string& key_name) + -> IRErrorCode { + encoded_tag_t str_packet_tag{}; + if (auto const err{deserialize_tag(reader, str_packet_tag)}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + if (auto const err{deserialize_string(reader, str_packet_tag, key_name)}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + return IRErrorCode::IRErrorCode_Success; +} + +auto deserialize_int_val(ReaderInterface& reader, encoded_tag_t tag, value_int_t& val) + -> IRErrorCode { + if (cProtocol::Payload::ValueInt8 == tag) { + int8_t deserialized_val{}; + if (false == deserialize_int(reader, deserialized_val)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + // NOLINTNEXTLINE(bugprone-signed-char-misuse,cert-str34-c) + val = deserialized_val; + } else if (cProtocol::Payload::ValueInt16 == tag) { + int16_t deserialized_val{}; + if (false == deserialize_int(reader, deserialized_val)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + val = deserialized_val; + } else if (cProtocol::Payload::ValueInt32 == tag) { + int32_t deserialized_val{}; + if (false == deserialize_int(reader, deserialized_val)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + val = deserialized_val; + } else if (cProtocol::Payload::ValueInt64 == tag) { + int64_t deserialized_val{}; + if (false == deserialize_int(reader, deserialized_val)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + val = deserialized_val; + } else { + return IRErrorCode::IRErrorCode_Corrupted_IR; + } + return IRErrorCode::IRErrorCode_Success; +} + +auto deserialize_string(ReaderInterface& reader, encoded_tag_t tag, std::string& deserialized_str) + -> IRErrorCode { + size_t str_length{}; + if (cProtocol::Payload::StrLenUByte == tag) { + uint8_t length{}; + if (false == deserialize_int(reader, length)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + str_length = static_cast(length); + } else if (cProtocol::Payload::StrLenUShort == tag) { + uint16_t length{}; + if (false == deserialize_int(reader, length)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + str_length = static_cast(length); + } else if (cProtocol::Payload::StrLenUInt == tag) { + uint32_t length{}; + if (false == deserialize_int(reader, length)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + str_length = static_cast(length); + } else { + return IRErrorCode::IRErrorCode_Corrupted_IR; + } + if (clp::ErrorCode_Success != reader.try_read_string(str_length, deserialized_str)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + return IRErrorCode::IRErrorCode_Success; +} + +auto deserialize_utc_offset_changes( + ReaderInterface& reader, + encoded_tag_t& tag, + UtcOffset& utc_offset +) -> IRErrorCode { + while (cProtocol::Payload::UtcOffsetChange == tag) { + if (auto const err{deserialize_utc_offset_change(reader, utc_offset)}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + if (auto const err{deserialize_tag(reader, tag)}; IRErrorCode::IRErrorCode_Success != err) { + return err; + } + } + return IRErrorCode::IRErrorCode_Success; +} + +auto deserialize_schema_tree_nodes( + ReaderInterface& reader, + encoded_tag_t& tag, + SchemaTree& schema_tree +) -> IRErrorCode { + while (is_schema_tree_node_tag(tag)) { + auto const type{schema_tree_node_tag_to_type(tag)}; + if (false == type.has_value()) { + return IRErrorCode::IRErrorCode_Corrupted_IR; + } + + SchemaTreeNode::id_t parent_id{}; + if (auto const err{deserialize_schema_tree_node_parent_id(reader, parent_id)}; + IRErrorCode_Success != err) + { + return err; + } + + std::string key_name; + if (auto const err{deserialize_schema_tree_node_key_name(reader, key_name)}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + + // Insert the node to the schema tree + SchemaTree::NodeLocator const locator{parent_id, key_name, type.value()}; + if (schema_tree.has_node(locator)) { + return IRErrorCode::IRErrorCode_Corrupted_IR; + } + std::ignore = schema_tree.insert_node(locator); + + // Read the next tag + if (auto const err{deserialize_tag(reader, tag)}; IRErrorCode::IRErrorCode_Success != err) { + return err; + } + } + return IRErrorCode::IRErrorCode_Success; +} + +auto deserialize_schema(ReaderInterface& reader, encoded_tag_t& tag, Schema& schema) + -> IRErrorCode { + schema.clear(); + while (true) { + if (cProtocol::Payload::KeyIdUByte == tag) { + uint8_t id{}; + if (false == deserialize_int(reader, id)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + schema.push_back(static_cast(id)); + } else if (cProtocol::Payload::KeyIdUShort == tag) { + uint16_t id{}; + if (false == deserialize_int(reader, id)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + schema.push_back(static_cast(id)); + } else { + break; + } + + if (auto const err{deserialize_tag(reader, tag)}; IRErrorCode::IRErrorCode_Success != err) { + return err; + } + } + + return IRErrorCode::IRErrorCode_Success; +} + +auto deserialize_value_and_insert_to_node_id_value_pairs( + ReaderInterface& reader, + encoded_tag_t tag, + SchemaTreeNode::id_t node_id, + KeyValuePairLogEvent::NodeIdValuePairs& node_id_value_pairs +) -> IRErrorCode { + switch (tag) { + case cProtocol::Payload::ValueInt8: + case cProtocol::Payload::ValueInt16: + case cProtocol::Payload::ValueInt32: + case cProtocol::Payload::ValueInt64: { + value_int_t value_int{}; + if (auto const err{deserialize_int_val(reader, tag, value_int)}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + node_id_value_pairs.emplace(node_id, Value{value_int}); + break; + } + case cProtocol::Payload::ValueFloat: { + uint64_t val{}; + if (false == deserialize_int(reader, val)) { + return IRErrorCode::IRErrorCode_Incomplete_IR; + } + node_id_value_pairs.emplace(node_id, Value{bit_cast(val)}); + break; + } + case cProtocol::Payload::ValueTrue: + node_id_value_pairs.emplace(node_id, Value{true}); + break; + case cProtocol::Payload::ValueFalse: + node_id_value_pairs.emplace(node_id, Value{false}); + break; + case cProtocol::Payload::StrLenUByte: + case cProtocol::Payload::StrLenUShort: + case cProtocol::Payload::StrLenUInt: { + std::string value_str; + if (auto const err{deserialize_string(reader, tag, value_str)}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + node_id_value_pairs.emplace(node_id, Value{std::move(value_str)}); + break; + } + case cProtocol::Payload::ValueEightByteEncodingClpStr: + if (auto const err{deserialize_encoded_text_ast_and_insert_to_node_id_value_pairs< + ir::eight_byte_encoded_variable_t>(reader, node_id, node_id_value_pairs)}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + break; + case cProtocol::Payload::ValueFourByteEncodingClpStr: + if (auto const err{deserialize_encoded_text_ast_and_insert_to_node_id_value_pairs< + ir::four_byte_encoded_variable_t>(reader, node_id, node_id_value_pairs)}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + break; + case cProtocol::Payload::ValueNull: + node_id_value_pairs.emplace(node_id, Value{}); + break; + case cProtocol::Payload::ValueEmpty: + node_id_value_pairs.emplace(node_id, std::nullopt); + break; + default: + return IRErrorCode::IRErrorCode_Corrupted_IR; + } + return IRErrorCode::IRErrorCode_Success; +} + +template +requires(std::is_same_v + || std::is_same_v) +[[nodiscard]] auto deserialize_encoded_text_ast_and_insert_to_node_id_value_pairs( + ReaderInterface& reader, + SchemaTreeNode::id_t node_id, + KeyValuePairLogEvent::NodeIdValuePairs& node_id_value_pairs +) -> IRErrorCode { + encoded_tag_t tag{}; + if (auto const err{deserialize_tag(reader, tag)}; IRErrorCode::IRErrorCode_Success != err) { + return err; + } + + std::string logtype; + std::vector encoded_vars; + std::vector dict_vars; + if (auto const err{deserialize_encoded_text_ast(reader, tag, logtype, encoded_vars, dict_vars)}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + + node_id_value_pairs.emplace( + node_id, + Value{ir::EncodedTextAst{logtype, dict_vars, encoded_vars}} + ); + return IRErrorCode::IRErrorCode_Success; +} + +auto deserialize_value_and_construct_node_id_value_pairs( + ReaderInterface& reader, + encoded_tag_t tag, + Schema const& schema, + KeyValuePairLogEvent::NodeIdValuePairs& node_id_value_pairs +) -> IRErrorCode { + node_id_value_pairs.clear(); + node_id_value_pairs.reserve(schema.size()); + for (auto const node_id : schema) { + if (node_id_value_pairs.contains(node_id)) { + // The key should be unique in a schema + return IRErrorCode_Corrupted_IR; + } + + if (auto const err{deserialize_value_and_insert_to_node_id_value_pairs( + reader, + tag, + node_id, + node_id_value_pairs + )}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + + if (schema.size() != node_id_value_pairs.size()) { + if (auto const err{deserialize_tag(reader, tag)}; + IRErrorCode::IRErrorCode_Success != err) + { + return err; + } + } + } + return IRErrorCode::IRErrorCode_Success; +} +} // namespace + +auto Deserializer::create(ReaderInterface& reader +) -> OUTCOME_V2_NAMESPACE::std_result { + bool is_four_byte_encoded{}; + if (auto const err{get_encoding_type(reader, is_four_byte_encoded)}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + + std::vector metadata; + encoded_tag_t metadata_type{}; + if (auto const err{deserialize_preamble(reader, metadata_type, metadata)}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + + if (cProtocol::Metadata::EncodingJson != metadata_type) { + return std::errc::protocol_not_supported; + } + + auto metadata_json = nlohmann::json::parse(metadata, nullptr, false); + if (metadata_json.is_discarded()) { + return std::errc::protocol_error; + } + auto const version_iter{metadata_json.find(cProtocol::Metadata::VersionKey)}; + if (metadata_json.end() == version_iter || false == version_iter->is_string()) { + return std::errc::protocol_error; + } + auto const version = version_iter->get_ref(); + // TODO: Just before the KV-pair IR format is formally released, we should replace this + // hard-coded version check with `ffi::ir_stream::validate_protocol_version`. + if (std::string_view{static_cast(cProtocol::Metadata::BetaVersionValue)} + != version) + { + return std::errc::protocol_not_supported; + } + + return Deserializer{}; +} + +auto Deserializer::deserialize_to_next_log_event(clp::ReaderInterface& reader +) -> OUTCOME_V2_NAMESPACE::std_result { + auto const utc_offset_snapshot{m_utc_offset}; + m_schema_tree->take_snapshot(); + TransactionManager revert_manager{ + []() -> void {}, + [&]() -> void { + m_utc_offset = utc_offset_snapshot; + m_schema_tree->revert(); + } + }; + + encoded_tag_t tag{}; + if (auto const err{deserialize_tag(reader, tag)}; IRErrorCode::IRErrorCode_Success != err) { + return ir_error_code_to_errc(err); + } + + if (auto const err{deserialize_utc_offset_changes(reader, tag, m_utc_offset)}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + + if (auto const err{deserialize_schema_tree_nodes(reader, tag, *m_schema_tree)}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + + Schema schema; + if (auto const err{deserialize_schema(reader, tag, schema)}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + + KeyValuePairLogEvent::NodeIdValuePairs node_id_value_pairs; + if (false == schema.empty()) { + if (auto const err{deserialize_value_and_construct_node_id_value_pairs( + reader, + tag, + schema, + node_id_value_pairs + )}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + } else { + if (cProtocol::Payload::ValueEmpty != tag) { + return ir_error_code_to_errc(IRErrorCode::IRErrorCode_Corrupted_IR); + } + } + + auto result{KeyValuePairLogEvent::create( + m_schema_tree, + std::move(node_id_value_pairs), + m_utc_offset + )}; + if (false == result.has_error()) { + revert_manager.mark_success(); + } + + return std::move(result); +} +} // namespace clp::ffi::ir_stream diff --git a/components/core/src/clp/ffi/ir_stream/Deserializer.hpp b/components/core/src/clp/ffi/ir_stream/Deserializer.hpp new file mode 100644 index 000000000..c7327fd5b --- /dev/null +++ b/components/core/src/clp/ffi/ir_stream/Deserializer.hpp @@ -0,0 +1,73 @@ +#ifndef CLP_FFI_IR_STREAM_DESERIALIZER_HPP +#define CLP_FFI_IR_STREAM_DESERIALIZER_HPP + +#include + +#include + +#include "../../ReaderInterface.hpp" +#include "../../time_types.hpp" +#include "../KeyValuePairLogEvent.hpp" +#include "../SchemaTree.hpp" + +namespace clp::ffi::ir_stream { +/** + * A deserializer for log events from a CLP kv-pair IR stream. The class ensures any internal state + * remains consistent even when a deserialization failure occurs (i.e., it's transactional). + * + * NOTE: This class is designed only to provide deserialization functionalities. Callers are + * responsible for maintaining a `ReaderInterface` to input IR bytes from an I/O stream. + */ +class Deserializer { +public: + // Factory function + /** + * Creates a deserializer by reading the stream's preamble from the given reader. + * @param reader + * @return A result containing the deserializer or an error code indicating the failure: + * - std::errc::result_out_of_range if the IR stream is truncated + * - std::errc::protocol_error if the IR stream is corrupted + * - std::errc::protocol_not_supported if the IR stream contains an unsupported metadata format + * or uses an unsupported version + */ + [[nodiscard]] static auto create(ReaderInterface& reader + ) -> OUTCOME_V2_NAMESPACE::std_result; + + // Delete copy constructor and assignment + Deserializer(Deserializer const&) = delete; + auto operator=(Deserializer const&) -> Deserializer& = delete; + + // Define default move constructor and assignment + Deserializer(Deserializer&&) = default; + auto operator=(Deserializer&&) -> Deserializer& = default; + + // Destructor + ~Deserializer() = default; + + // Methods + /** + * Deserializes the stream from the given reader up to and including the next log event. + * @param reader + * @return A result containing the deserialized log event or an error code indicating the + * failure: + * - std::errc::result_out_of_range if the IR stream is truncated + * - std::errc::protocol_error if the IR stream is corrupted + * - std::errc::protocol_not_supported if the IR stream contains an unsupported metadata format + * or uses an unsupported version + * - Same as `KeyValuePairLogEvent::create` if the intermediate deserialized result cannot + * construct a valid key-value pair log event + */ + [[nodiscard]] auto deserialize_to_next_log_event(ReaderInterface& reader + ) -> OUTCOME_V2_NAMESPACE::std_result; + +private: + // Constructor + Deserializer() = default; + + // Variables + std::shared_ptr m_schema_tree{std::make_shared()}; + UtcOffset m_utc_offset{0}; +}; +} // namespace clp::ffi::ir_stream + +#endif // CLP_FFI_IR_STREAM_DESERIALIZER_HPP diff --git a/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp b/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp index 29f57df0a..9388470e4 100644 --- a/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp +++ b/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp @@ -5,6 +5,7 @@ #include "../../ir/types.hpp" #include "byteswap.hpp" #include "protocol_constants.hpp" +#include "utils.hpp" using clp::ir::eight_byte_encoded_variable_t; using clp::ir::epoch_time_ms_t; @@ -24,16 +25,6 @@ namespace clp::ffi::ir_stream { template static bool is_variable_tag(encoded_tag_t tag, bool& is_encoded_var); -/** - * Deserializes an integer from the given reader - * @tparam integer_t Type of the integer to deserialize - * @param reader - * @param value Returns the deserialized integer - * @return true on success, false if the reader doesn't contain enough data to deserialize - */ -template -static bool deserialize_int(ReaderInterface& reader, integer_t& value); - /** * Deserializes a logtype from the given reader * @param reader @@ -138,27 +129,6 @@ static bool is_variable_tag(encoded_tag_t tag, bool& is_encoded_var) { return false; } -template -static bool deserialize_int(ReaderInterface& reader, integer_t& value) { - integer_t value_little_endian; - if (reader.try_read_numeric_value(value_little_endian) != ErrorCode_Success) { - return false; - } - - constexpr auto read_size = sizeof(integer_t); - static_assert(read_size == 1 || read_size == 2 || read_size == 4 || read_size == 8); - if constexpr (read_size == 1) { - value = value_little_endian; - } else if constexpr (read_size == 2) { - value = bswap_16(value_little_endian); - } else if constexpr (read_size == 4) { - value = bswap_32(value_little_endian); - } else if constexpr (read_size == 8) { - value = bswap_64(value_little_endian); - } - return true; -} - static IRErrorCode deserialize_logtype(ReaderInterface& reader, encoded_tag_t encoded_tag, string& logtype) { size_t logtype_length; @@ -362,6 +332,38 @@ auto deserialize_log_event( vector& encoded_vars, vector& dict_vars, epoch_time_ms_t& timestamp_or_timestamp_delta +) -> IRErrorCode { + if (auto const err + = deserialize_encoded_text_ast(reader, encoded_tag, logtype, encoded_vars, dict_vars); + IRErrorCode_Success != err) + { + return err; + } + + // NOTE: for the eight-byte encoding, the timestamp is the actual timestamp; for the four-byte + // encoding, the timestamp is a timestamp delta + if (ErrorCode_Success != reader.try_read_numeric_value(encoded_tag)) { + return IRErrorCode_Incomplete_IR; + } + if (auto error_code = deserialize_timestamp( + reader, + encoded_tag, + timestamp_or_timestamp_delta + ); + IRErrorCode_Success != error_code) + { + return error_code; + } + return IRErrorCode_Success; +} + +template +auto deserialize_encoded_text_ast( + ReaderInterface& reader, + encoded_tag_t encoded_tag, + std::string& logtype, + std::vector& encoded_vars, + std::vector& dict_vars ) -> IRErrorCode { // Handle variables string var_str; @@ -393,20 +395,6 @@ auto deserialize_log_event( return error_code; } - // NOTE: for the eight-byte encoding, the timestamp is the actual timestamp; for the four-byte - // encoding, the timestamp is a timestamp delta - if (ErrorCode_Success != reader.try_read_numeric_value(encoded_tag)) { - return IRErrorCode_Incomplete_IR; - } - if (auto error_code = deserialize_timestamp( - reader, - encoded_tag, - timestamp_or_timestamp_delta - ); - IRErrorCode_Success != error_code) - { - return error_code; - } return IRErrorCode_Success; } @@ -568,4 +556,20 @@ template auto deserialize_log_event( vector& dict_vars, epoch_time_ms_t& timestamp_or_timestamp_delta ) -> IRErrorCode; + +template auto deserialize_encoded_text_ast( + ReaderInterface& reader, + encoded_tag_t encoded_tag, + std::string& logtype, + std::vector& encoded_vars, + std::vector& dict_vars +) -> IRErrorCode; + +template auto deserialize_encoded_text_ast( + ReaderInterface& reader, + encoded_tag_t encoded_tag, + std::string& logtype, + std::vector& encoded_vars, + std::vector& dict_vars +) -> IRErrorCode; } // namespace clp::ffi::ir_stream diff --git a/components/core/src/clp/ffi/ir_stream/decoding_methods.hpp b/components/core/src/clp/ffi/ir_stream/decoding_methods.hpp index 982b9c7aa..fb6f6a3c0 100644 --- a/components/core/src/clp/ffi/ir_stream/decoding_methods.hpp +++ b/components/core/src/clp/ffi/ir_stream/decoding_methods.hpp @@ -89,6 +89,27 @@ auto deserialize_log_event( ir::epoch_time_ms_t& timestamp_or_timestamp_delta ) -> IRErrorCode; +/** + * Deserializes an encoded text AST from the given stream + * @tparam encoded_variable_t + * @param reader + * @param encoded_tag Tag of the next packet to read + * @param logtype Returns the logtype + * @param encoded_vars Returns the encoded variables + * @param dict_vars Returns the dictionary variables + * @return IRErrorCode_Success on success + * @return IRErrorCode_Corrupted_IR if `reader` contains invalid IR + * @return IRErrorCode_Incomplete_IR if `reader` doesn't contain enough data + */ +template +auto deserialize_encoded_text_ast( + ReaderInterface& reader, + encoded_tag_t encoded_tag, + std::string& logtype, + std::vector& encoded_vars, + std::vector& dict_vars +) -> IRErrorCode; + /** * Decodes the IR message calls the given methods to handle each component of the message * @tparam unescape_logtype Whether to remove the escape characters from the logtype before calling diff --git a/components/core/src/clp/ffi/ir_stream/protocol_constants.hpp b/components/core/src/clp/ffi/ir_stream/protocol_constants.hpp index 9f84cc4e1..915a8a56d 100644 --- a/components/core/src/clp/ffi/ir_stream/protocol_constants.hpp +++ b/components/core/src/clp/ffi/ir_stream/protocol_constants.hpp @@ -73,6 +73,8 @@ constexpr int8_t SchemaTreeNodeParentIdUShort = 0x61; constexpr int8_t KeyIdUByte = 0x65; constexpr int8_t KeyIdUShort = 0x66; +constexpr int8_t SchemaTreeNodeMask = 0x70; + constexpr int8_t SchemaTreeNodeInt = 0x71; constexpr int8_t SchemaTreeNodeFloat = 0x72; constexpr int8_t SchemaTreeNodeBool = 0x73; diff --git a/components/core/src/clp/ffi/ir_stream/utils.hpp b/components/core/src/clp/ffi/ir_stream/utils.hpp index 7879c3f74..b816c6385 100644 --- a/components/core/src/clp/ffi/ir_stream/utils.hpp +++ b/components/core/src/clp/ffi/ir_stream/utils.hpp @@ -9,7 +9,9 @@ #include +#include "../../ErrorCode.hpp" #include "../../ir/types.hpp" +#include "../../ReaderInterface.hpp" #include "byteswap.hpp" #include "encoding_methods.hpp" #include "protocol_constants.hpp" @@ -33,6 +35,16 @@ serialize_metadata(nlohmann::json& metadata, std::vector& output_buf) -> template auto serialize_int(integer_t value, std::vector& output_buf) -> void; +/** + * Deserializes an integer from the given reader + * @tparam integer_t Type of the integer to deserialize + * @param reader + * @param value Returns the deserialized integer + * @return Whether the reader contained enough data to deserialize. + */ +template +[[nodiscard]] auto deserialize_int(ReaderInterface& reader, integer_t& value) -> bool; + /** * Serializes a string using CLP's encoding for unstructured text. * @tparam encoded_variable_t @@ -72,6 +84,27 @@ auto serialize_int(integer_t value, std::vector& output_buf) -> void { output_buf.insert(output_buf.end(), data_view.begin(), data_view.end()); } +template +auto deserialize_int(ReaderInterface& reader, integer_t& value) -> bool { + integer_t value_little_endian; + if (reader.try_read_numeric_value(value_little_endian) != clp::ErrorCode_Success) { + return false; + } + + constexpr auto cReadSize = sizeof(integer_t); + static_assert(cReadSize == 1 || cReadSize == 2 || cReadSize == 4 || cReadSize == 8); + if constexpr (cReadSize == 1) { + value = value_little_endian; + } else if constexpr (cReadSize == 2) { + value = bswap_16(value_little_endian); + } else if constexpr (cReadSize == 4) { + value = bswap_32(value_little_endian); + } else if constexpr (cReadSize == 8) { + value = bswap_64(value_little_endian); + } + return true; +} + template [[nodiscard]] auto serialize_clp_string( std::string_view str, diff --git a/components/core/tests/test-ir_encoding_methods.cpp b/components/core/tests/test-ir_encoding_methods.cpp index e9b161b8a..6eec92673 100644 --- a/components/core/tests/test-ir_encoding_methods.cpp +++ b/components/core/tests/test-ir_encoding_methods.cpp @@ -14,6 +14,7 @@ #include "../src/clp/ErrorCode.hpp" #include "../src/clp/ffi/encoding_methods.hpp" #include "../src/clp/ffi/ir_stream/decoding_methods.hpp" +#include "../src/clp/ffi/ir_stream/Deserializer.hpp" #include "../src/clp/ffi/ir_stream/encoding_methods.hpp" #include "../src/clp/ffi/ir_stream/protocol_constants.hpp" #include "../src/clp/ffi/ir_stream/Serializer.hpp" @@ -34,6 +35,7 @@ using clp::ffi::ir_stream::cProtocol::MagicNumberLength; using clp::ffi::ir_stream::deserialize_preamble; using clp::ffi::ir_stream::deserialize_tag; using clp::ffi::ir_stream::deserialize_utc_offset_change; +using clp::ffi::ir_stream::Deserializer; using clp::ffi::ir_stream::encoded_tag_t; using clp::ffi::ir_stream::get_encoding_type; using clp::ffi::ir_stream::IRErrorCode; @@ -150,6 +152,14 @@ template Serializer& serializer ) -> bool; +/** + * Counts the number of leaves in a JSON tree. A node is considered as a leaf if it's a primitive + * value, an empty map (`{}`), or an array. + * @param root + * @return The number of leaves under the given root. + */ +[[nodiscard]] auto count_num_leaves(nlohmann::json const& root) -> size_t; + template [[nodiscard]] auto serialize_log_events( vector const& log_events, @@ -275,6 +285,30 @@ auto unpack_and_serialize_msgpack_bytes( } return serializer.serialize_msgpack_map(msgpack_obj.via.map); } + +// NOLINTNEXTLINE(misc-no-recursion) +auto count_num_leaves(nlohmann::json const& root) -> size_t { + if (false == root.is_object()) { + return 0; + } + + size_t num_leaves{0}; + for (auto const& [key, val] : root.items()) { + if (val.is_primitive() || val.is_array()) { + ++num_leaves; + } else if (val.is_object()) { + if (val.empty()) { + ++num_leaves; + } else { + num_leaves += count_num_leaves(val); + } + } else { + FAIL("Unknown JSON object types."); + } + } + + return num_leaves; +} } // namespace /** @@ -1006,15 +1040,15 @@ TEMPLATE_TEST_CASE( ); } +// NOLINTNEXTLINE(readability-function-cognitive-complexity) TEMPLATE_TEST_CASE( "ffi_ir_stream_Serializer_serialize_msgpack", "[clp][ffi][ir_stream][Serializer]", four_byte_encoded_variable_t, eight_byte_encoded_variable_t ) { - // TODO: Test deserializing the serialized bytes once a KV-pair IR deserializer is implemented. - vector ir_buf; + vector serialized_json_objects; auto result{Serializer::create()}; REQUIRE((false == result.has_error())); @@ -1024,6 +1058,7 @@ TEMPLATE_TEST_CASE( auto const empty_obj = nlohmann::json::parse("{}"); REQUIRE(unpack_and_serialize_msgpack_bytes(nlohmann::json::to_msgpack(empty_obj), serializer)); + serialized_json_objects.emplace_back(empty_obj); // Test encoding basic object constexpr string_view cShortString{"short_string"}; @@ -1050,6 +1085,7 @@ TEMPLATE_TEST_CASE( {"empty_array", empty_array}}; REQUIRE(unpack_and_serialize_msgpack_bytes(nlohmann::json::to_msgpack(basic_obj), serializer)); + serialized_json_objects.emplace_back(basic_obj); auto basic_array = empty_array; basic_array.emplace_back(1); @@ -1083,5 +1119,26 @@ TEMPLATE_TEST_CASE( nlohmann::json::to_msgpack(recursive_obj), serializer )); + serialized_json_objects.emplace_back(recursive_obj); + } + + flush_and_clear_serializer_buffer(serializer, ir_buf); + + // Deserialize the results + BufferReader reader{size_checked_pointer_cast(ir_buf.data()), ir_buf.size()}; + auto deserializer_result = Deserializer::create(reader); + REQUIRE_FALSE(deserializer_result.has_error()); + auto& deserializer = deserializer_result.value(); + + for (auto const& json_obj : serialized_json_objects) { + auto const kv_log_event_result = deserializer.deserialize_to_next_log_event(reader); + REQUIRE_FALSE(kv_log_event_result.has_error()); + auto const& kv_log_event = kv_log_event_result.value(); + auto const num_leaves_in_json_obj = count_num_leaves(json_obj); + auto const num_kv_pairs = kv_log_event.get_node_id_value_pairs().size(); + REQUIRE((num_leaves_in_json_obj == num_kv_pairs)); } + + // TODO: Test validating the deserialized bytes once we've implemented a KeyValuePairLogEvent to + // JSON deserializer. }