Skip to content

Commit

Permalink
ffi: Add support for serializing msgpack map into KV-pair IR format. (y…
Browse files Browse the repository at this point in the history
  • Loading branch information
LinZhihao-723 authored Jul 17, 2024
1 parent 862fccf commit eebbf1e
Show file tree
Hide file tree
Showing 8 changed files with 747 additions and 22 deletions.
460 changes: 460 additions & 0 deletions components/core/src/clp/ffi/ir_stream/Serializer.cpp

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions components/core/src/clp/ffi/ir_stream/Serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@

#include <cstdint>
#include <span>
#include <string>
#include <vector>

#include <boost-outcome/include/boost/outcome/std_result.hpp>
#include <msgpack.hpp>

#include "../../time_types.hpp"
#include "../SchemaTree.hpp"
#include "../SchemaTreeNode.hpp"

namespace clp::ffi::ir_stream {
/**
Expand Down Expand Up @@ -79,14 +82,47 @@ class Serializer {
*/
auto change_utc_offset(UtcOffset utc_offset) -> void;

/**
* Serializes the given msgpack map as a key-value pair log event.
* @param msgpack_map
* @return Whether serialization succeeded.
*/
[[nodiscard]] auto serialize_msgpack_map(msgpack::object_map const& msgpack_map) -> bool;

private:
// Constructors
Serializer() = default;

// Methods
/**
* Serializes a schema tree node identified by the given locator into `m_schema_tree_node_buf`.
* @param locator
* @return Whether serialization succeeded.
*/
[[nodiscard]] auto serialize_schema_tree_node(SchemaTree::NodeLocator const& locator) -> bool;

/**
* Serializes the given key ID into `m_key_group_buf`.
* @param id
* @return true on success.
* @return false if the ID exceeds the representable range.
*/
[[nodiscard]] auto serialize_key(SchemaTreeNode::id_t id) -> bool;

/**
* Serializes the given MessagePack value into `m_value_group_buf`.
* @param val
* @param schema_tree_node_type The type of the schema tree node that corresponds to `val`.
* @return Whether serialization succeeded.
*/
[[nodiscard]] auto
serialize_val(msgpack::object const& val, SchemaTreeNode::Type schema_tree_node_type) -> bool;

UtcOffset m_curr_utc_offset{0};
Buffer m_ir_buf;
SchemaTree m_schema_tree;

std::string m_logtype_buf;
Buffer m_schema_tree_node_buf;
Buffer m_key_group_buf;
Buffer m_value_group_buf;
Expand Down
26 changes: 17 additions & 9 deletions components/core/src/clp/ffi/ir_stream/encoding_methods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,22 @@ bool serialize_log_event(
string_view message,
string& logtype,
vector<int8_t>& ir_buf
) {
if (false == serialize_message(message, logtype, ir_buf)) {
return false;
}

// Encode timestamp
ir_buf.push_back(cProtocol::Payload::TimestampVal);
serialize_int(timestamp, ir_buf);

return true;
}

bool serialize_message(
std::string_view message,
std::string& logtype,
std::vector<int8_t>& ir_buf
) {
auto encoded_var_handler = [&ir_buf](eight_byte_encoded_variable_t encoded_var) {
ir_buf.push_back(cProtocol::Payload::VarEightByteEncoding);
Expand All @@ -153,15 +169,7 @@ bool serialize_log_event(
return false;
}

if (false == serialize_logtype(logtype, ir_buf)) {
return false;
}

// Encode timestamp
ir_buf.push_back(cProtocol::Payload::TimestampVal);
serialize_int(timestamp, ir_buf);

return true;
return serialize_logtype(logtype, ir_buf);
}
} // namespace eight_byte_encoding

Expand Down
9 changes: 9 additions & 0 deletions components/core/src/clp/ffi/ir_stream/encoding_methods.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ bool serialize_log_event(
std::string& logtype,
std::vector<int8_t>& ir_buf
);

/**
* Serializes the given message into the eight-byte encoding IR stream.
* @param message
* @param logtype Returns the message's logtype.
* @param ir_buf
* @return Whether the message was serialized successfully.
*/
bool serialize_message(std::string_view message, std::string& logtype, std::vector<int8_t>& ir_buf);
} // namespace eight_byte_encoding

namespace four_byte_encoding {
Expand Down
29 changes: 29 additions & 0 deletions components/core/src/clp/ffi/ir_stream/protocol_constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,35 @@ constexpr int8_t TimestampDeltaInt = 0x33;
constexpr int8_t TimestampDeltaLong = 0x34;

constexpr int8_t UtcOffsetChange = 0x3F;

constexpr int8_t StrLenUByte = 0x41;
constexpr int8_t StrLenUShort = 0x42;
constexpr int8_t StrLenUInt = 0x43;

constexpr int8_t ValueInt8 = 0x51;
constexpr int8_t ValueInt16 = 0x52;
constexpr int8_t ValueInt32 = 0x53;
constexpr int8_t ValueInt64 = 0x54;
constexpr int8_t ValueFloat = 0x56;
constexpr int8_t ValueTrue = 0x57;
constexpr int8_t ValueFalse = 0x58;
constexpr int8_t ValueFourByteEncodingClpStr = 0x59;
constexpr int8_t ValueEightByteEncodingClpStr = 0x5A;
constexpr int8_t ValueEmpty = 0x5E;
constexpr int8_t ValueNull = 0x5F;

constexpr int8_t SchemaTreeNodeParentIdUByte = 0x60;
constexpr int8_t SchemaTreeNodeParentIdUShort = 0x61;

constexpr int8_t KeyIdUByte = 0x65;
constexpr int8_t KeyIdUShort = 0x66;

constexpr int8_t SchemaTreeNodeInt = 0x71;
constexpr int8_t SchemaTreeNodeFloat = 0x72;
constexpr int8_t SchemaTreeNodeBool = 0x73;
constexpr int8_t SchemaTreeNodeStr = 0x74;
constexpr int8_t SchemaTreeNodeUnstructuredArray = 0x75;
constexpr int8_t SchemaTreeNodeObj = 0x76;
} // namespace Payload

constexpr int8_t FourByteEncodingMagicNumber[]
Expand Down
34 changes: 27 additions & 7 deletions components/core/src/clp/ffi/ir_stream/utils.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "utils.hpp"

#include <cstdint>
#include <string_view>
#include <vector>

#include <json/single_include/nlohmann/json.hpp>
Expand All @@ -9,24 +10,43 @@
#include "protocol_constants.hpp"

namespace clp::ffi::ir_stream {
auto serialize_metadata(nlohmann::json& metadata, std::vector<int8_t>& ir_buf) -> bool {
ir_buf.push_back(cProtocol::Metadata::EncodingJson);
auto serialize_metadata(nlohmann::json& metadata, std::vector<int8_t>& output_buf) -> bool {
output_buf.push_back(cProtocol::Metadata::EncodingJson);

auto const metadata_serialized
= metadata.dump(-1, ' ', false, nlohmann::json::error_handler_t::ignore);
auto const metadata_serialized_length = metadata_serialized.length();
if (metadata_serialized_length <= UINT8_MAX) {
ir_buf.push_back(cProtocol::Metadata::LengthUByte);
ir_buf.push_back(bit_cast<int8_t>(static_cast<uint8_t>(metadata_serialized_length)));
output_buf.push_back(cProtocol::Metadata::LengthUByte);
output_buf.push_back(bit_cast<int8_t>(static_cast<uint8_t>(metadata_serialized_length)));
} else if (metadata_serialized_length <= UINT16_MAX) {
ir_buf.push_back(cProtocol::Metadata::LengthUShort);
serialize_int(static_cast<uint16_t>(metadata_serialized_length), ir_buf);
output_buf.push_back(cProtocol::Metadata::LengthUShort);
serialize_int(static_cast<uint16_t>(metadata_serialized_length), output_buf);
} else {
// Can't encode metadata longer than 64 KiB
return false;
}
ir_buf.insert(ir_buf.cend(), metadata_serialized.cbegin(), metadata_serialized.cend());
output_buf.insert(output_buf.cend(), metadata_serialized.cbegin(), metadata_serialized.cend());

return true;
}

auto serialize_string(std::string_view str, std::vector<int8_t>& output_buf) -> bool {
auto const length{str.length()};
if (length <= UINT8_MAX) {
output_buf.push_back(cProtocol::Payload::StrLenUByte);
output_buf.push_back(bit_cast<int8_t>(static_cast<uint8_t>(length)));
} else if (length <= UINT16_MAX) {
output_buf.push_back(cProtocol::Payload::StrLenUShort);
serialize_int(static_cast<uint16_t>(length), output_buf);
} else if (length <= UINT32_MAX) {
output_buf.push_back(cProtocol::Payload::StrLenUInt);
serialize_int(static_cast<uint32_t>(length), output_buf);
} else {
// Out of range
return false;
}
output_buf.insert(output_buf.cend(), str.cbegin(), str.cend());
return true;
}
} // namespace clp::ffi::ir_stream
61 changes: 55 additions & 6 deletions components/core/src/clp/ffi/ir_stream/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,61 @@

#include <cstdint>
#include <span>
#include <string>
#include <string_view>
#include <vector>

#include <json/single_include/nlohmann/json.hpp>

#include "../../ir/types.hpp"
#include "byteswap.hpp"
#include "encoding_methods.hpp"
#include "protocol_constants.hpp"

namespace clp::ffi::ir_stream {
/**
* Serializes the given metadata into the IR stream.
* @param metadata
* @param ir_buf
* @param output_buf
* @return Whether serialization succeeded.
*/
[[nodiscard]] auto
serialize_metadata(nlohmann::json& metadata, std::vector<int8_t>& ir_buf) -> bool;
serialize_metadata(nlohmann::json& metadata, std::vector<int8_t>& output_buf) -> bool;

/**
* Serializes the given integer into the IR stream.
* @tparam integer_t
* @param value
* @param ir_buf
* @param output_buf
*/
template <typename integer_t>
auto serialize_int(integer_t value, std::vector<int8_t>& ir_buf) -> void;
auto serialize_int(integer_t value, std::vector<int8_t>& output_buf) -> void;

/**
* Serializes a string using CLP's encoding for unstructured text.
* @tparam encoded_variable_t
* @param str
* @param logtype Returns the corresponding logtype.
* @param output_buf
* @return Whether serialization succeeded.
*/
template <typename encoded_variable_t>
[[nodiscard]] auto serialize_clp_string(
std::string_view str,
std::string& logtype,
std::vector<int8_t>& output_buf
) -> bool;

/**
* Serializes a string.
* @param str
* @param output_buf
* @return Whether serialization succeeded.
*/
[[nodiscard]] auto serialize_string(std::string_view str, std::vector<int8_t>& output_buf) -> bool;

template <typename integer_t>
auto serialize_int(integer_t value, std::vector<int8_t>& ir_buf) -> void {
auto serialize_int(integer_t value, std::vector<int8_t>& output_buf) -> void {
integer_t value_big_endian{};
static_assert(sizeof(integer_t) == 2 || sizeof(integer_t) == 4 || sizeof(integer_t) == 8);
if constexpr (sizeof(value) == 2) {
Expand All @@ -41,7 +69,28 @@ auto serialize_int(integer_t value, std::vector<int8_t>& ir_buf) -> void {
}
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
std::span<int8_t> const data_view{reinterpret_cast<int8_t*>(&value_big_endian), sizeof(value)};
ir_buf.insert(ir_buf.end(), data_view.begin(), data_view.end());
output_buf.insert(output_buf.end(), data_view.begin(), data_view.end());
}

template <typename encoded_variable_t>
[[nodiscard]] auto serialize_clp_string(
std::string_view str,
std::string& logtype,
std::vector<int8_t>& output_buf
) -> bool {
static_assert(
(std::is_same_v<encoded_variable_t, clp::ir::eight_byte_encoded_variable_t>
|| std::is_same_v<encoded_variable_t, clp::ir::four_byte_encoded_variable_t>)
);
bool succeeded{};
if constexpr (std::is_same_v<encoded_variable_t, clp::ir::four_byte_encoded_variable_t>) {
output_buf.push_back(cProtocol::Payload::ValueFourByteEncodingClpStr);
succeeded = four_byte_encoding::serialize_message(str, logtype, output_buf);
} else {
output_buf.push_back(cProtocol::Payload::ValueEightByteEncodingClpStr);
succeeded = eight_byte_encoding::serialize_message(str, logtype, output_buf);
}
return succeeded;
}
} // namespace clp::ffi::ir_stream
#endif
Loading

0 comments on commit eebbf1e

Please sign in to comment.