Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for UTC offset serialization/deserialization. #8

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/ffi_go/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ typedef struct {
typedef struct {
StringView m_log_message;
epoch_time_ms_t m_timestamp;
epoch_time_ms_t m_utc_offset;
} LogEventView;

// NOLINTEND(modernize-use-using)
Expand Down
172 changes: 123 additions & 49 deletions cpp/src/ffi_go/ir/deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <clp/ir/types.hpp>
#include <clp/string_utils/string_utils.hpp>
#include <clp/time_types.hpp>

#include "ffi_go/api_decoration.h"
#include "ffi_go/defs.h"
Expand All @@ -26,12 +27,15 @@
namespace ffi_go::ir {
using clp::BufferReader;
using clp::ffi::ir_stream::cProtocol::Eof;
using clp::ffi::ir_stream::cProtocol::Payload::UtcOffsetChange;
using clp::ffi::ir_stream::deserialize_preamble;
using clp::ffi::ir_stream::deserialize_tag;
using clp::ffi::ir_stream::encoded_tag_t;
using clp::ffi::ir_stream::get_encoding_type;
using clp::ffi::ir_stream::IRErrorCode;
using clp::ir::eight_byte_encoded_variable_t;
using clp::ir::four_byte_encoded_variable_t;
using clp::UtcOffset;

namespace {
/**
Expand Down Expand Up @@ -59,6 +63,41 @@ template <class encoded_variable_t>
size_t* matching_query
) -> int;

/**
* Tries to deserialize UTC offset changes until the next IR packet doesn't indicate a UTC offset
* change.
* @param ir_buf
* @param tag Outputs the tag after deserializing UTC offset changes.
* @param deserializer The deserialized of the current IR stream. The underlying UTC offset will be
* updated by the last deserialized UTC offset on success.
* @return IRErrorCode::IRErrorCode_Success on success.
* @return IRErrorCode::IRErrorCode_Incomplete_IR if the reader doesn't contain enough data to
* deserialize.
*/
[[nodiscard]] auto try_deserialize_utc_offset_changes(
BufferReader& ir_buf,
encoded_tag_t& tag,
Deserializer* deserializer
) -> IRErrorCode;

/**
* Tries to deserialize the next log event from the IR buffer.
* @tparam encoded_variable_t
* @param ir_buf
* @param tag
* @param deserializer The deserializer of the current IR stream to buffer the deserialized
* log event.
* @return The return code of `clp::ffi::ir_stream::four_byte_encoding::deserialize_log_event` or
* `clp::ffi::ir_stream::eight_byte_encoding::deserialize_log_event`, depending on the encoded type
* `encoded_variable_t`.
*/
template <class encoded_variable_t>
[[nodiscard]] auto try_deserialize_next_log_event(
BufferReader& ir_buf,
encoded_tag_t tag,
Deserializer* deserializer
) -> IRErrorCode;

template <class encoded_variable_t>
auto deserialize_log_event(
ByteSpan ir_view,
Expand All @@ -72,39 +111,25 @@ auto deserialize_log_event(
BufferReader ir_buf{static_cast<char const*>(ir_view.m_data), ir_view.m_size};
Deserializer* deserializer{static_cast<Deserializer*>(ir_deserializer)};

clp::ffi::ir_stream::encoded_tag_t tag{};
encoded_tag_t tag{};
if (auto const err{deserialize_tag(ir_buf, tag)}; IRErrorCode::IRErrorCode_Success != err) {
return static_cast<int>(err);
}
if (auto const err{try_deserialize_utc_offset_changes(ir_buf, tag, deserializer)};
IRErrorCode::IRErrorCode_Success != err)
{
return static_cast<int>(err);
}
if (Eof == tag) {
return static_cast<int>(IRErrorCode::IRErrorCode_Eof);
}

IRErrorCode err{};
epoch_time_ms_t timestamp{};
if constexpr (std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>) {
err = clp::ffi::ir_stream::eight_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp
);
} else if constexpr (std::is_same_v<encoded_variable_t, four_byte_encoded_variable_t>) {
epoch_time_ms_t timestamp_delta{};
err = clp::ffi::ir_stream::four_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp_delta
);
timestamp = deserializer->m_timestamp + timestamp_delta;
} else {
static_assert(cAlwaysFalse<encoded_variable_t>, "Invalid/unhandled encoding type");
}
if (IRErrorCode::IRErrorCode_Success != err) {
if (auto const err{try_deserialize_next_log_event<encoded_variable_t>(ir_buf, tag, deserializer)
};
IRErrorCode::IRErrorCode_Success != err)
{
return static_cast<int>(err);
}
deserializer->m_timestamp = timestamp;

size_t pos{0};
if (clp::ErrorCode_Success != ir_buf.try_get_pos(pos)) {
Expand All @@ -114,6 +139,7 @@ auto deserialize_log_event(
log_event->m_log_message.m_data = deserializer->m_log_event.m_log_message.data();
log_event->m_log_message.m_size = deserializer->m_log_event.m_log_message.size();
log_event->m_timestamp = deserializer->m_timestamp;
log_event->m_utc_offset = static_cast<epoch_time_ms_t>(deserializer->m_utc_offset.count());
return static_cast<int>(IRErrorCode::IRErrorCode_Success);
}

Expand Down Expand Up @@ -173,40 +199,27 @@ auto deserialize_wildcard_match(
query_fn = [](ffi_go::LogMessage const&) -> std::pair<bool, size_t> { return {true, 0}; };
}

IRErrorCode err{};
while (true) {
clp::ffi::ir_stream::encoded_tag_t tag{};
if (err = deserialize_tag(ir_buf, tag); IRErrorCode::IRErrorCode_Success != err) {
encoded_tag_t tag{};
if (auto const err{deserialize_tag(ir_buf, tag)}; IRErrorCode::IRErrorCode_Success != err) {
return static_cast<int>(err);
}
if (auto const err{try_deserialize_utc_offset_changes(ir_buf, tag, deserializer)};
IRErrorCode::IRErrorCode_Success != err)
{
return static_cast<int>(IRErrorCode::IRErrorCode_Eof);
}
if (Eof == tag) {
return static_cast<int>(IRErrorCode::IRErrorCode_Eof);
}

epoch_time_ms_t timestamp{};
if constexpr (std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>) {
err = clp::ffi::ir_stream::eight_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp
);
} else if constexpr (std::is_same_v<encoded_variable_t, four_byte_encoded_variable_t>) {
epoch_time_ms_t timestamp_delta{};
err = clp::ffi::ir_stream::four_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp_delta
);
timestamp = deserializer->m_timestamp + timestamp_delta;
} else {
static_assert(cAlwaysFalse<encoded_variable_t>, "Invalid/unhandled encoding type");
}
if (IRErrorCode::IRErrorCode_Success != err) {
if (auto const err{
try_deserialize_next_log_event<encoded_variable_t>(ir_buf, tag, deserializer)
};
IRErrorCode::IRErrorCode_Success != err)
{
return static_cast<int>(err);
}
deserializer->m_timestamp = timestamp;

if (time_interval.m_upper <= deserializer->m_timestamp) {
// TODO this is an extremely fragile hack until the CLP ffi ir
Expand All @@ -231,10 +244,71 @@ auto deserialize_wildcard_match(
log_event->m_log_message.m_data = deserializer->m_log_event.m_log_message.data();
log_event->m_log_message.m_size = deserializer->m_log_event.m_log_message.size();
log_event->m_timestamp = deserializer->m_timestamp;
log_event->m_utc_offset = static_cast<epoch_time_ms_t>(deserializer->m_utc_offset.count());
*matching_query = matching_query_idx;
return static_cast<int>(IRErrorCode::IRErrorCode_Success);
}
}

auto try_deserialize_utc_offset_changes(
BufferReader& ir_buf,
encoded_tag_t& tag,
Deserializer* deserializer
) -> IRErrorCode {
if (UtcOffsetChange != tag) {
return IRErrorCode::IRErrorCode_Success;
}

UtcOffset utc_offset{0};
while (UtcOffsetChange == tag) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets just simplify this by taking out the while loop and drop the try part. If we find multiple offset change packets in a row it will get handled by the next log event while loop anyway, so lets just keep this function as simple as possible.

if (auto const err{clp::ffi::ir_stream::deserialize_utc_offset_change(ir_buf, utc_offset)})
{
return err;
}
if (auto const err{deserialize_tag(ir_buf, tag)}) {
return err;
}
}
deserializer->m_utc_offset = utc_offset;
return IRErrorCode::IRErrorCode_Success;
}

template <class encoded_variable_t>
auto try_deserialize_next_log_event(
BufferReader& ir_buf,
encoded_tag_t tag,
Deserializer* deserializer
) -> IRErrorCode {
static_assert(
(std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>
|| std::is_same_v<encoded_variable_t, four_byte_encoded_variable_t>)
);

epoch_time_ms_t timestamp{};
IRErrorCode err{};
if constexpr (std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>) {
err = clp::ffi::ir_stream::eight_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp
);
} else {
epoch_time_ms_t timestamp_delta{};
err = clp::ffi::ir_stream::four_byte_encoding::deserialize_log_event(
ir_buf,
tag,
deserializer->m_log_event.m_log_message,
timestamp_delta
);
timestamp = deserializer->m_timestamp + timestamp_delta;
}

if (IRErrorCode::IRErrorCode_Success == err) {
deserializer->m_timestamp = timestamp;
}
return err;
}
} // namespace

CLP_FFI_GO_METHOD auto ir_deserializer_close(void* ir_deserializer) -> void {
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/ffi_go/ir/serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ffi/ir_stream/encoding_methods.hpp>
#include <clp/ir/types.hpp>
#include <clp/time_types.hpp>

#include "ffi_go/api_decoration.h"
#include "ffi_go/defs.h"
Expand Down Expand Up @@ -196,4 +197,20 @@ CLP_FFI_GO_METHOD auto ir_serializer_serialize_four_byte_log_event(
ir_view
);
}

CLP_FFI_GO_METHOD auto ir_serializer_serialize_utc_offset_change(
epoch_time_ms_t utc_offset_change,
void* ir_serializer,
ByteSpan* ir_view
) -> void {
Serializer* serializer{static_cast<Serializer*>(ir_serializer)};
clp::UtcOffset const utc_offset{utc_offset_change};
serializer->m_ir_buf.clear();
if (utc_offset != serializer->m_curr_utc_offset) {
clp::ffi::ir_stream::serialize_utc_offset_change(utc_offset, serializer->m_ir_buf);
serializer->m_curr_utc_offset = utc_offset;
}
ir_view->m_data = serializer->m_ir_buf.data();
ir_view->m_size = serializer->m_ir_buf.size();
}
} // namespace ffi_go::ir
14 changes: 14 additions & 0 deletions cpp/src/ffi_go/ir/serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,19 @@ CLP_FFI_GO_METHOD int ir_serializer_serialize_four_byte_log_event(
ByteSpan* ir_view
);

/**
* Given an UTC offset, serialize it into an IR byte stream. An ir::Serializer must be provided to
* use as the backing storage for the corresponding Go ir.Serializer. All pointer parameters must be
* non-null (non-nil Cgo C.<type> pointer or unsafe.Pointer from Go)
* @param[in] utc_offset_change UTC offset change to serialize in milliseconds
* @param[in] ir_serializer ir::Serializer object to be used as storage
* @param[out] ir_view View of a IR buffer containing the serialized log event
*/
CLP_FFI_GO_METHOD void ir_serializer_serialize_utc_offset_change(
epoch_time_ms_t utc_offset_change,
void* ir_serializer,
ByteSpan* ir_view
);

// NOLINTEND(modernize-use-trailing-return-type)
#endif // FFI_GO_IR_SERIALIZER_H
3 changes: 3 additions & 0 deletions cpp/src/ffi_go/ir/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <vector>

#include <clp/ir/types.hpp>
#include <clp/time_types.hpp>

#include "ffi_go/types.hpp"

Expand Down Expand Up @@ -52,6 +53,7 @@ struct Encoder {
struct Deserializer {
ffi_go::LogEventStorage m_log_event;
clp::ir::epoch_time_ms_t m_timestamp{};
clp::UtcOffset m_utc_offset{0};
};

/**
Expand All @@ -74,6 +76,7 @@ struct Serializer {

std::string m_logtype;
std::vector<int8_t> m_ir_buf;
clp::UtcOffset m_curr_utc_offset{0};
};
} // namespace ffi_go::ir

Expand Down
1 change: 1 addition & 0 deletions ffi/ffi.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ type LogEvent struct {
type LogEventView struct {
LogMessageView
Timestamp EpochTimeMs
UtcOffset EpochTimeMs
}
1 change: 1 addition & 0 deletions include/ffi_go/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ typedef struct {
typedef struct {
StringView m_log_message;
epoch_time_ms_t m_timestamp;
epoch_time_ms_t m_utc_offset;
} LogEventView;

// NOLINTEND(modernize-use-using)
Expand Down
14 changes: 14 additions & 0 deletions include/ffi_go/ir/serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,19 @@ CLP_FFI_GO_METHOD int ir_serializer_serialize_four_byte_log_event(
ByteSpan* ir_view
);

/**
* Given an UTC offset, serialize it into an IR byte stream. An ir::Serializer must be provided to
* use as the backing storage for the corresponding Go ir.Serializer. All pointer parameters must be
* non-null (non-nil Cgo C.<type> pointer or unsafe.Pointer from Go)
* @param[in] utc_offset_change UTC offset change to serialize in milliseconds
* @param[in] ir_serializer ir::Serializer object to be used as storage
* @param[out] ir_view View of a IR buffer containing the serialized log event
*/
CLP_FFI_GO_METHOD void ir_serializer_serialize_utc_offset_change(
epoch_time_ms_t utc_offset_change,
void* ir_serializer,
ByteSpan* ir_view
);

// NOLINTEND(modernize-use-trailing-return-type)
#endif // FFI_GO_IR_SERIALIZER_H
4 changes: 3 additions & 1 deletion ir/deserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Deserializer interface {
}

// DeserializePreamble attempts to read an IR stream preamble from irBuf,
// returning an Deserializer (of the correct stream encoding size), the position
// returning a Deserializer (of the correct stream encoding size), the position
// read to in irBuf (the end of the preamble), and an error. Note the metadata
// stored in the preamble is sparse and certain fields in TimestampInfo may be 0
// value. On error returns:
Expand Down Expand Up @@ -256,6 +256,7 @@ func deserializeLogEvent(
event.m_log_message.m_size,
),
Timestamp: ffi.EpochTimeMs(event.m_timestamp),
UtcOffset: ffi.EpochTimeMs(event.m_utc_offset),
},
int(pos),
nil
Expand Down Expand Up @@ -307,6 +308,7 @@ func deserializeWildcardMatch(
event.m_log_message.m_size,
),
Timestamp: ffi.EpochTimeMs(event.m_timestamp),
UtcOffset: ffi.EpochTimeMs(event.m_utc_offset),
},
int(pos),
int(match),
Expand Down
Loading
Loading