Skip to content

Commit

Permalink
Refactor StreamReader to modularize decoding logic. (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
junhaoliao authored Oct 31, 2024
1 parent e4fda3e commit 9e82372
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 51 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ target_include_directories(

target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/)

set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp)
set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/decoding_methods.cpp
src/clp_ffi_js/ir/StreamReader.cpp
)

set(CLP_FFI_JS_SRC_CLP_CORE
src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp
Expand Down
76 changes: 30 additions & 46 deletions src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ir/LogEventDeserializer.hpp>
#include <clp/ir/types.hpp>
#include <clp/streaming_compression/zstd/Decompressor.hpp>
Expand All @@ -27,6 +26,7 @@

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/decoding_methods.hpp>
#include <clp_ffi_js/ir/LogEventWithLevel.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

Expand All @@ -48,51 +48,8 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader {
auto zstd_decompressor{std::make_unique<clp::streaming_compression::zstd::Decompressor>()};
zstd_decompressor->open(data_buffer.data(), length);

bool is_four_bytes_encoding{true};
if (auto const err{
clp::ffi::ir_stream::get_encoding_type(*zstd_decompressor, is_four_bytes_encoding)
};
clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err)
{
SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
"Failed to decode encoding type."
};
}
if (false == is_four_bytes_encoding) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
"IR stream uses unsupported encoding."
};
}

auto result{
clp::ir::LogEventDeserializer<four_byte_encoded_variable_t>::create(*zstd_decompressor)
};
if (result.has_error()) {
auto const error_code{result.error()};
SPDLOG_CRITICAL(
"Failed to create deserializer: {}:{}",
error_code.category().name(),
error_code.message()
);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
"Failed to create deserializer"
};
}

StreamReaderDataContext<four_byte_encoded_variable_t> stream_reader_data_context{
std::move(data_buffer),
std::move(zstd_decompressor),
std::move(result.value())
auto stream_reader_data_context{
create_data_context(std::move(zstd_decompressor), std::move(data_buffer))
};
return StreamReader{std::move(stream_reader_data_context)};
}
Expand Down Expand Up @@ -251,6 +208,33 @@ StreamReader::StreamReader(
std::move(stream_reader_data_context)
)},
m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {}

auto StreamReader::create_data_context(
std::unique_ptr<clp::streaming_compression::zstd::Decompressor>&& zstd_decompressor,
clp::Array<char> data_buffer
) -> StreamReaderDataContext<four_byte_encoded_variable_t> {
rewind_reader_and_validate_encoding_type(*zstd_decompressor);

auto result{
clp::ir::LogEventDeserializer<four_byte_encoded_variable_t>::create(*zstd_decompressor)
};
if (result.has_error()) {
auto const error_code{result.error()};
SPDLOG_CRITICAL(
"Failed to create deserializer: {}:{}",
error_code.category().name(),
error_code.message()
);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
"Failed to create deserializer"
};
}

return {std::move(data_buffer), std::move(zstd_decompressor), std::move(result.value())};
}
} // namespace clp_ffi_js::ir

namespace {
Expand Down
20 changes: 16 additions & 4 deletions src/clp_ffi_js/ir/StreamReader.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#ifndef CLP_FFI_JS_IR_STREAM_READER_HPP
#define CLP_FFI_JS_IR_STREAM_READER_HPP

#include <Array.hpp>
#include <cstddef>
#include <memory>
#include <optional>
#include <streaming_compression/zstd/Decompressor.hpp>
#include <vector>

#include <clp/ir/types.hpp>
Expand All @@ -15,6 +17,8 @@
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

namespace clp_ffi_js::ir {
using clp::ir::four_byte_encoded_variable_t;

EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType);
Expand Down Expand Up @@ -52,6 +56,7 @@ class StreamReader {
// Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`.
auto operator=(StreamReader&&) -> StreamReader& = delete;

// Methods
/**
* @return The number of events buffered.
*/
Expand Down Expand Up @@ -97,12 +102,19 @@ class StreamReader {

private:
// Constructor
explicit StreamReader(StreamReaderDataContext<clp::ir::four_byte_encoded_variable_t>&&
stream_reader_data_context);
explicit StreamReader(
StreamReaderDataContext<four_byte_encoded_variable_t>&& stream_reader_data_context
);

// Methods
[[nodiscard]] static auto create_data_context(
std::unique_ptr<clp::streaming_compression::zstd::Decompressor>&& zstd_decompressor,
clp::Array<char> data_buffer
) -> StreamReaderDataContext<four_byte_encoded_variable_t>;

// Variables
std::vector<LogEventWithLevel<clp::ir::four_byte_encoded_variable_t>> m_encoded_log_events;
std::unique_ptr<StreamReaderDataContext<clp::ir::four_byte_encoded_variable_t>>
std::vector<LogEventWithLevel<four_byte_encoded_variable_t>> m_encoded_log_events;
std::unique_ptr<StreamReaderDataContext<four_byte_encoded_variable_t>>
m_stream_reader_data_context;
FilteredLogEventsMap m_filtered_log_event_map;
clp::TimestampPattern m_ts_pattern;
Expand Down
36 changes: 36 additions & 0 deletions src/clp_ffi_js/ir/decoding_methods.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include "decoding_methods.hpp"

#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ReaderInterface.hpp>
#include <clp/TraceableException.hpp>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>

namespace clp_ffi_js::ir {
auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void {
reader.seek_from_begin(0);

bool is_four_bytes_encoding{true};
if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)};
clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err)
{
SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
"Failed to decode encoding type."
};
}
if (false == is_four_bytes_encoding) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
"IR stream uses unsupported encoding."
};
}
}
} // namespace clp_ffi_js::ir
16 changes: 16 additions & 0 deletions src/clp_ffi_js/ir/decoding_methods.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#ifndef CLP_FFI_JS_IR_DECODING_METHODS_HPP
#define CLP_FFI_JS_IR_DECODING_METHODS_HPP

#include <clp/ReaderInterface.hpp>

namespace clp_ffi_js::ir {
/**
* Rewinds the reader to the beginning and validates the CLP IR data encoding type.
* @param reader
* @throws ClpFfiJsException if the encoding type couldn't be decoded or the encoding type is
* unsupported.
*/
auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void;
} // namespace clp_ffi_js::ir

#endif // CLP_FFI_JS_IR_DECODING_METHODS_HPP

0 comments on commit 9e82372

Please sign in to comment.