Skip to content

Commit

Permalink
Extract a StreamReader base class from IRStreamReader.
Browse files Browse the repository at this point in the history
  • Loading branch information
junhaoliao committed Oct 15, 2024
1 parent cfd55ca commit d1ed2f0
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 19 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/)
set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/decoding_methods.cpp
src/clp_ffi_js/ir/IRStreamReader.cpp
src/clp_ffi_js/ir/StreamReader.cpp
)

set(CLP_FFI_JS_SRC_CLP_CORE
Expand Down
11 changes: 4 additions & 7 deletions src/clp_ffi_js/ir/IRStreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/decoding_methods.hpp>
#include <clp_ffi_js/ir/LogEventWithLevel.hpp>
#include <clp_ffi_js/ir/StreamReader.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

using namespace std::literals::string_literals;
Expand Down Expand Up @@ -241,13 +242,9 @@ auto IRStreamReader::create_deserializer_and_data_context(

namespace {
EMSCRIPTEN_BINDINGS(ClpIRStreamReader) {
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
"Array<[string, number, number, number]>"
);
emscripten::register_type<clp_ffi_js::ir::FilteredLogEventMapTsType>("number[] | null");

emscripten::class_<clp_ffi_js::ir::IRStreamReader>("ClpIRStreamReader")
emscripten::class_<
clp_ffi_js::ir::IRStreamReader,
emscripten::base<clp_ffi_js::ir::StreamReader>>("ClpIRStreamReader")
.constructor(
&clp_ffi_js::ir::IRStreamReader::create,
emscripten::return_value_policy::take_ownership()
Expand Down
23 changes: 11 additions & 12 deletions src/clp_ffi_js/ir/IRStreamReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@
#include <emscripten/val.h>

#include <clp_ffi_js/ir/LogEventWithLevel.hpp>
#include <clp_ffi_js/ir/StreamReader.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

using clp::ir::four_byte_encoded_variable_t;

namespace clp_ffi_js::ir {
EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType);

/**
* Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded
* log events.
*/
class IRStreamReader {
class IRStreamReader : public StreamReader {
friend StreamReader;

public:
/**
* Mapping between an index in the filtered log events collection to an index in the unfiltered
Expand All @@ -44,7 +43,7 @@ class IRStreamReader {
[[nodiscard]] static auto create(DataArrayTsType const& data_array) -> IRStreamReader;

// Destructor
~IRStreamReader() = default;
~IRStreamReader() override = default;

// Disable copy constructor and assignment operator
IRStreamReader(IRStreamReader const&) = delete;
Expand All @@ -59,27 +58,27 @@ class IRStreamReader {
/**
* @return The number of events buffered.
*/
[[nodiscard]] auto get_num_events_buffered() const -> size_t;
[[nodiscard]] auto get_num_events_buffered() const -> size_t override;

/**
* @return The filtered log events map.
*/
[[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType;
[[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override;

/**
* Generates a filtered collection from all log events.
*
* @param log_level_filter Array of selected log levels
*/
void filter_log_events(emscripten::val const& log_level_filter);
void filter_log_events(emscripten::val const& log_level_filter) override;

/**
* Deserializes all log events in the stream. After the stream has been exhausted, it will be
* deallocated.
*
* @return The number of successfully deserialized ("valid") log events.
*/
[[nodiscard]] auto deserialize_stream() -> size_t;
[[nodiscard]] auto deserialize_stream() -> size_t override;

/**
* Decodes log events in the range `[beginIdx, endIdx)` of the filtered or unfiltered
Expand All @@ -96,8 +95,8 @@ class IRStreamReader {
* @return null if any log event in the range doesn't exist (e.g. the range exceeds the number
* of log events in the collection).
*/
[[nodiscard]] auto
decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType;
[[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType override;

private:
// Constructor
Expand Down
70 changes: 70 additions & 0 deletions src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include "StreamReader.hpp"

#include <cstddef>
#include <cstdint>
#include <ErrorCode.hpp>
#include <format>
#include <memory>
#include <utility>

#include <clp/Array.hpp>
#include <clp/streaming_compression/zstd/Decompressor.hpp>
#include <clp/TraceableException.hpp>
#include <emscripten/bind.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/ir/decoding_methods.hpp>
#include <clp_ffi_js/ir/IRStreamReader.hpp>

namespace clp_ffi_js::ir {
auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<StreamReader> {
auto const length{data_array["length"].as<size_t>()};
SPDLOG_INFO("KVPairIRStreamReader::create: got buffer of length={}", length);

// Copy array from JavaScript to C++
clp::Array<char> data_buffer{length};
// NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast)
emscripten::val::module_property("HEAPU8")
.call<void>("set", data_array, reinterpret_cast<uintptr_t>(data_buffer.data()));
// NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast)

auto zstd_decompressor{std::make_unique<clp::streaming_compression::zstd::Decompressor>()};
zstd_decompressor->open(data_buffer.data(), length);

auto const version{get_version(*zstd_decompressor)};
if (version == "v0.0.0") {
auto stream_reader_data_context{IRStreamReader::create_deserializer_and_data_context(
std::move(zstd_decompressor),
std::move(data_buffer)
)};

return std::unique_ptr<IRStreamReader>(
new IRStreamReader(std::move(stream_reader_data_context))
);
}

throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
std::format("Unable to create stream reader for IR data with version {}.", version)
};
}
} // namespace clp_ffi_js::ir

namespace {
EMSCRIPTEN_BINDINGS(ClpStreamReader) {
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
"Array<[string, number, number, number]>"
);
emscripten::register_type<clp_ffi_js::ir::FilteredLogEventMapTsType>("number[] | null");

emscripten::class_<clp_ffi_js::ir::StreamReader>("ClpStreamReader")
.constructor(
&clp_ffi_js::ir::StreamReader::create,
emscripten::return_value_policy::take_ownership()
);
}
} // namespace
89 changes: 89 additions & 0 deletions src/clp_ffi_js/ir/StreamReader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#ifndef CLP_FFI_JS_IR_STREAM_READER_HPP
#define CLP_FFI_JS_IR_STREAM_READER_HPP

#include <cstddef>
#include <memory>

#include <emscripten/val.h>

namespace clp_ffi_js::ir {
EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType);

/**
* Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded
* log events.
*/
class StreamReader {
public:
/**
* Creates a StreamReader to read from the given array.
*
* @param data_array An array containing a Zstandard-compressed IR stream.
* @return The created instance.
* @throw ClpFfiJsException if any error occurs.
*/
[[nodiscard]] static auto create(DataArrayTsType const& data_array
) -> std::unique_ptr<StreamReader>;

// Destructor
virtual ~StreamReader() = default;

// Disable copy constructor and assignment operator
StreamReader(StreamReader const&) = delete;
auto operator=(StreamReader const&) -> StreamReader& = delete;

// Define default move constructor
StreamReader(StreamReader&&) = default;
// Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`.
auto operator=(StreamReader&&) -> StreamReader& = delete;

/**
* @return The number of events buffered.
*/
[[nodiscard]] virtual auto get_num_events_buffered() const -> size_t = 0;

/**
* @return The filtered log events map.
*/
[[nodiscard]] virtual auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType = 0;

/**
* Generates a filtered collection from all log events.
*
* @param log_level_filter Array of selected log levels
*/
virtual void filter_log_events(emscripten::val const& log_level_filter) = 0;

/**
* Deserializes all log events in the stream. After the stream has been exhausted, it will be
* deallocated.
*
* @return The number of successfully deserialized ("valid") log events.
*/
[[nodiscard]] virtual auto deserialize_stream() -> size_t = 0;

/**
* Decodes log events in the range `[beginIdx, endIdx)` of the filtered or unfiltered
* (depending on the value of `useFilter`) log events collection.
*
* @param begin_idx
* @param end_idx
* @param use_filter Whether to decode from the filtered or unfiltered log events collection.
* @return An array where each element is a decoded log event represented by an array of:
* - The log event's message
* - The log event's timestamp as milliseconds since the Unix epoch
* - The log event's log level as an integer that indexes into `cLogLevelNames`
* - The log event's number (1-indexed) in the stream
* @return null if any log event in the range doesn't exist (e.g. the range exceeds the number
* of log events in the collection).
*/
[[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType = 0;

protected:
explicit StreamReader() = default;
};
} // namespace clp_ffi_js::ir
#endif // CLP_FFI_JS_IR_STREAM_READER_HPP

0 comments on commit d1ed2f0

Please sign in to comment.