Skip to content

Commit

Permalink
Refactor StreamReader to modularize decoding logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
junhaoliao committed Oct 15, 2024
1 parent e4fda3e commit 3dae806
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 51 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ target_include_directories(

target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/)

set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp)
set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/decoding_methods.cpp
src/clp_ffi_js/ir/StreamReader.cpp
)

set(CLP_FFI_JS_SRC_CLP_CORE
src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp
Expand Down
77 changes: 31 additions & 46 deletions src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ir/LogEventDeserializer.hpp>
#include <clp/ir/types.hpp>
#include <clp/streaming_compression/zstd/Decompressor.hpp>
Expand All @@ -27,6 +26,7 @@

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/decoding_methods.hpp>
#include <clp_ffi_js/ir/LogEventWithLevel.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

Expand All @@ -48,52 +48,10 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader {
auto zstd_decompressor{std::make_unique<clp::streaming_compression::zstd::Decompressor>()};
zstd_decompressor->open(data_buffer.data(), length);

bool is_four_bytes_encoding{true};
if (auto const err{
clp::ffi::ir_stream::get_encoding_type(*zstd_decompressor, is_four_bytes_encoding)
};
clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err)
{
SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
"Failed to decode encoding type."
};
}
if (false == is_four_bytes_encoding) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
"IR stream uses unsupported encoding."
};
}

auto result{
clp::ir::LogEventDeserializer<four_byte_encoded_variable_t>::create(*zstd_decompressor)
};
if (result.has_error()) {
auto const error_code{result.error()};
SPDLOG_CRITICAL(
"Failed to create deserializer: {}:{}",
error_code.category().name(),
error_code.message()
);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
"Failed to create deserializer"
};
}

StreamReaderDataContext<four_byte_encoded_variable_t> stream_reader_data_context{
std::move(data_buffer),
auto stream_reader_data_context{create_deserializer_and_data_context(
std::move(zstd_decompressor),
std::move(result.value())
};
std::move(data_buffer)
)};
return StreamReader{std::move(stream_reader_data_context)};
}

Expand Down Expand Up @@ -251,6 +209,33 @@ StreamReader::StreamReader(
std::move(stream_reader_data_context)
)},
m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {}

auto StreamReader::create_deserializer_and_data_context(
std::unique_ptr<clp::streaming_compression::zstd::Decompressor>&& zstd_decompressor,
clp::Array<char>&& data_buffer
) -> StreamReaderDataContext<four_byte_encoded_variable_t> {
rewind_reader_and_verify_encoding_type(*zstd_decompressor);

auto result{
clp::ir::LogEventDeserializer<four_byte_encoded_variable_t>::create(*zstd_decompressor)
};
if (result.has_error()) {
auto const error_code{result.error()};
SPDLOG_CRITICAL(
"Failed to create deserializer: {}:{}",
error_code.category().name(),
error_code.message()
);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
"Failed to create deserializer"
};
}

return {std::move(data_buffer), std::move(zstd_decompressor), std::move(result.value())};
}
} // namespace clp_ffi_js::ir

namespace {
Expand Down
19 changes: 15 additions & 4 deletions src/clp_ffi_js/ir/StreamReader.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#ifndef CLP_FFI_JS_IR_STREAM_READER_HPP
#define CLP_FFI_JS_IR_STREAM_READER_HPP

#include <Array.hpp>
#include <cstddef>
#include <memory>
#include <optional>
#include <streaming_compression/zstd/Decompressor.hpp>
#include <vector>

#include <clp/ir/types.hpp>
Expand All @@ -14,6 +16,8 @@
#include <clp_ffi_js/ir/LogEventWithLevel.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

using clp::ir::four_byte_encoded_variable_t;

namespace clp_ffi_js::ir {
EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType);
Expand Down Expand Up @@ -97,12 +101,19 @@ class StreamReader {

private:
// Constructor
explicit StreamReader(StreamReaderDataContext<clp::ir::four_byte_encoded_variable_t>&&
stream_reader_data_context);
explicit StreamReader(
StreamReaderDataContext<four_byte_encoded_variable_t>&& stream_reader_data_context
);

// Methods
[[nodiscard]] static auto create_deserializer_and_data_context(
std::unique_ptr<clp::streaming_compression::zstd::Decompressor>&& zstd_decompressor,
clp::Array<char>&& data_buffer
) -> StreamReaderDataContext<four_byte_encoded_variable_t>;

// Variables
std::vector<LogEventWithLevel<clp::ir::four_byte_encoded_variable_t>> m_encoded_log_events;
std::unique_ptr<StreamReaderDataContext<clp::ir::four_byte_encoded_variable_t>>
std::vector<LogEventWithLevel<four_byte_encoded_variable_t>> m_encoded_log_events;
std::unique_ptr<StreamReaderDataContext<four_byte_encoded_variable_t>>
m_stream_reader_data_context;
FilteredLogEventsMap m_filtered_log_event_map;
clp::TimestampPattern m_ts_pattern;
Expand Down
36 changes: 36 additions & 0 deletions src/clp_ffi_js/ir/decoding_methods.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include "decoding_methods.hpp"

#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ReaderInterface.hpp>
#include <clp/TraceableException.hpp>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>

namespace clp_ffi_js::ir {
auto rewind_reader_and_verify_encoding_type(clp::ReaderInterface& reader) -> void {
reader.seek_from_begin(0);

bool is_four_bytes_encoding{true};
if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)};
clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err)
{
SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
"Failed to decode encoding type."
};
}
if (false == is_four_bytes_encoding) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
"IR stream uses unsupported encoding."
};
}
}
} // namespace clp_ffi_js::ir
10 changes: 10 additions & 0 deletions src/clp_ffi_js/ir/decoding_methods.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#ifndef CLP_FFI_JS_IR_DECODING_METHODS_HPP
#define CLP_FFI_JS_IR_DECODING_METHODS_HPP

#include <clp/ReaderInterface.hpp>

namespace clp_ffi_js::ir {
auto rewind_reader_and_verify_encoding_type(clp::ReaderInterface& reader) -> void;
} // namespace clp_ffi_js::ir

#endif // CLP_FFI_JS_IR_DECODING_METHODS_HPP

0 comments on commit 3dae806

Please sign in to comment.