Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
davemarco committed Nov 6, 2024
1 parent 3764298 commit c81cd4b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 92 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ target_include_directories(
target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/)

set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp
src/clp_ffi_js/ir/StreamReader.cpp
src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp
)

set(CLP_FFI_JS_SRC_CLP_CORE
Expand Down
116 changes: 65 additions & 51 deletions src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <cstddef>
#include <cstdint>
#include <format>
#include <json/single_include/nlohmann/json.hpp>
#include <memory>
#include <string>
#include <string_view>
Expand All @@ -12,36 +13,43 @@

#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <clp/ReaderInterface.hpp>
#include <clp/streaming_compression/zstd/Decompressor.hpp>
#include <clp/TraceableException.hpp>
#include <clp/type_utils.hpp>

#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ffi/ir_stream/protocol_constants.hpp>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/ir/UnstructuredIrStreamReader.hpp>

#include <json/single_include/nlohmann/json.hpp>
#include <emscripten/bind.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/ir/UnstructuredIrStreamReader.hpp>

namespace clp_ffi_js::ir {

namespace {
using ClpFfiJsException = clp_ffi_js::ClpFfiJsException;
using IRErrorCode = clp::ffi::ir_stream::IRErrorCode;

/**
* Rewinds the reader to start then validates the CLP IR data encoding type.
* @param reader
* @throws ClpFfiJsException if the encoding type couldn't be decoded or the encoding type is
* unsupported.
*/
auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void {
reader.seek_from_begin(0);

bool is_four_bytes_encoding{true};
if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)};
clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err)
IRErrorCode::IRErrorCode_Success != err)
{
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
std::format("Failed to decode encoding type, err={}", clp::enum_to_underlying_type(err))
std::format(
"Failed to decode encoding type: IR error code {}",
clp::enum_to_underlying_type(err)
)
};
}
if (false == is_four_bytes_encoding) {
Expand All @@ -54,21 +62,26 @@ auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> v
}
}

/**
* Gets the version of the IR stream.
* @param reader
* @throws ClpFfiJsException if the preamble couldn't be deserialized.
* @return The IR stream's version.
*/
auto get_version(clp::ReaderInterface& reader) -> std::string {
// Deserialize metadata bytes from preamble.
clp::ffi::ir_stream::encoded_tag_t metadata_type{};
std::vector<int8_t> metadata_bytes;
auto const deserialize_preamble_result{
clp::ffi::ir_stream::deserialize_preamble(reader, metadata_type, metadata_bytes)
auto const err{clp::ffi::ir_stream::deserialize_preamble(reader, metadata_type, metadata_bytes)
};
if (clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != deserialize_preamble_result) {
if (IRErrorCode::IRErrorCode_Success != err) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format(
"Failed to deserialize preamble for version reading: {}",
clp::enum_to_underlying_type(deserialize_preamble_result)
"Failed to deserialize preamble: IR error code {}",
clp::enum_to_underlying_type(err)
)
};
}
Expand All @@ -91,10 +104,41 @@ auto get_version(clp::ReaderInterface& reader) -> std::string {
};
}

SPDLOG_INFO("The version is {}", version);
SPDLOG_INFO("IR version is {}", version);
return version;
}

EMSCRIPTEN_BINDINGS(ClpStreamReader) {
// JS types used as inputs
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::LogLevelFilterTsType>("number[] | null");

// JS types used as outputs
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
"Array<[string, number, number, number]>"
);
emscripten::register_type<clp_ffi_js::ir::FilteredLogEventMapTsType>("number[] | null");
emscripten::class_<clp_ffi_js::ir::StreamReader>("ClpStreamReader")
.constructor(
&clp_ffi_js::ir::StreamReader::create,
emscripten::return_value_policy::take_ownership()
)
.function(
"getNumEventsBuffered",
&clp_ffi_js::ir::StreamReader::get_num_events_buffered
)
.function(
"getFilteredLogEventMap",
&clp_ffi_js::ir::StreamReader::get_filtered_log_event_map
)
.function("filterLogEvents", &clp_ffi_js::ir::StreamReader::filter_log_events)
.function("deserializeStream", &clp_ffi_js::ir::StreamReader::deserialize_stream)
.function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range);
}
} // namespace

namespace clp_ffi_js::ir {

auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<StreamReader> {
auto const length{data_array["length"].as<size_t>()};
SPDLOG_INFO("StreamReader::create: got buffer of length={}", length);
Expand All @@ -119,47 +163,17 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<
zstd_decompressor->seek_from_begin(reader_offset);

if (std::ranges::find(cUnstructuredIrVersions, version) != cUnstructuredIrVersions.end()) {
return UnstructuredIrStreamReader::create(
return std::make_unique<UnstructuredIrStreamReader>(UnstructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer)
);
));
}

throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
std::format("Unable to create reader for CLP stream with version {}.", version)
std::format("Unable to create reader for IR stream with version {}.", version)
};
}
} // namespace clp_ffi_js::ir

namespace {
EMSCRIPTEN_BINDINGS(ClpStreamReader) {
// JS types used as outputs
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
"Array<[string, number, number, number]>"
);
emscripten::register_type<clp_ffi_js::ir::FilteredLogEventMapTsType>("number[] | null");

// JS types used as inputs
emscripten::register_type<clp_ffi_js::ir::LogLevelFilterTsType>("number[] | null");
emscripten::class_<clp_ffi_js::ir::StreamReader>("ClpStreamReader")
.constructor(
&clp_ffi_js::ir::StreamReader::create,
emscripten::return_value_policy::take_ownership()
)
.function(
"getNumEventsBuffered",
&clp_ffi_js::ir::StreamReader::get_num_events_buffered
)
.function(
"getFilteredLogEventMap",
&clp_ffi_js::ir::StreamReader::get_filtered_log_event_map
)
.function("filterLogEvents", &clp_ffi_js::ir::StreamReader::filter_log_events)
.function("deserializeStream", &clp_ffi_js::ir::StreamReader::deserialize_stream)
.function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range);
}
} // namespace
28 changes: 7 additions & 21 deletions src/clp_ffi_js/ir/StreamReader.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef CLP_FFI_JS_IR_STREAM_READER_HPP
#define CLP_FFI_JS_IR_STREAM_READER_HPP
#ifndef CLP_FFI_JS_IR_STREAMREADER_HPP
#define CLP_FFI_JS_IR_STREAMREADER_HPP

#include <array>
#include <cstddef>
Expand All @@ -11,31 +11,17 @@

namespace clp_ffi_js::ir {

// JS types used as inputs
EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(LogLevelFilterTsType);

// JS types used as outputs
EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(LogLevelFilterTsType);

constexpr std::array<std::string_view, 6> cUnstructuredIrVersions
= {"v0.0.2", "v0.0.1", "v0.0.0", "0.0.2", "0.0.1", "0.0.0"};

/**
* Rewinds the reader to the beginning and validates the CLP IR data encoding type.
* @param reader
* @throws ClpFfiJsException if the encoding type couldn't be decoded or the encoding type is
* unsupported.
*/
static auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void;

/**
* Gets the version of the IR stream from the specified reader.
* @param reader
* @throws Propagates `rewind_reader_and_validate_encoding_type`'s exceptions.
* @throws ClpFfiJsException if the preamble couldn't be deserialized.
* @return The stream's version.
*/
static auto get_version(clp::ReaderInterface& reader) -> std::string;

/**
* Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded
* log events.
Expand Down Expand Up @@ -114,4 +100,4 @@ class StreamReader {
explicit StreamReader() = default;
};
} // namespace clp_ffi_js::ir
#endif // CLP_FFI_JS_IR_STREAM_READER_HPP
#endif // CLP_FFI_JS_IR_STREAMREADER_HPP
8 changes: 2 additions & 6 deletions src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <clp/ErrorCode.hpp>
#include <clp/ir/LogEventDeserializer.hpp>
#include <clp/ir/types.hpp>
#include <clp/streaming_compression/zstd/Decompressor.hpp>
#include <clp/TraceableException.hpp>
#include <clp/type_utils.hpp>
#include <emscripten/bind.h>
Expand All @@ -39,8 +38,7 @@ using clp::ir::four_byte_encoded_variable_t;
auto UnstructuredIrStreamReader::create(
std::unique_ptr<ZstdDecompressor>&& zstd_decompressor,
clp::Array<char>&& data_array
) -> std::unique_ptr<StreamReader> {

) -> UnstructuredIrStreamReader {
auto result{
clp::ir::LogEventDeserializer<four_byte_encoded_variable_t>::create(*zstd_decompressor)
};
Expand All @@ -62,9 +60,7 @@ auto UnstructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(result.value())
);
return std::make_unique<UnstructuredIrStreamReader>(
UnstructuredIrStreamReader(std::move(data_context))
);
return UnstructuredIrStreamReader(std::move(data_context));
}

auto UnstructuredIrStreamReader::get_num_events_buffered() const -> size_t {
Expand Down
26 changes: 13 additions & 13 deletions src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#ifndef CLP_FFI_JS_IR_UNSTRUCTUREDUnstructuredIrStreamReader_HPP
#define CLP_FFI_JS_IR_UNSTRUCTUREDUnstructuredIrStreamReader_HPP
#ifndef CLP_FFI_JS_IR_UNSTRUCTUREDIRSTREAMREADER_HPP
#define CLP_FFI_JS_IR_UNSTRUCTUREDIRSTREAMREADER_HPP

#include <Array.hpp>
#include <cstddef>
#include <memory>
#include <optional>
#include <streaming_compression/zstd/Decompressor.hpp>
#include <vector>

#include <clp/ir/types.hpp>
Expand All @@ -30,8 +29,6 @@ using FilteredLogEventsMap = std::optional<std::vector<size_t>>;
* decoded log events.
*/
class UnstructuredIrStreamReader : public StreamReader {
friend StreamReader;

public:
// Destructor
~UnstructuredIrStreamReader() override = default;
Expand All @@ -45,10 +42,19 @@ class UnstructuredIrStreamReader : public StreamReader {
// Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`.
auto operator=(UnstructuredIrStreamReader&&) -> UnstructuredIrStreamReader& = delete;

/**
* First packages a `StreamReaderDataContext` using inputs and a unstructured IR deserializer,
* then creates a `UnstructuredIrStreamReader`.
*
* @param zstd_decompressor
* @param data_array An array containing a Zstandard-compressed IR stream.
* @return The created instance.
* @throw ClpFfiJsException if any error occurs.
*/
[[nodiscard]] static auto create(
std::unique_ptr<ZstdDecompressor>&& zstd_decompressor,
clp::Array<char>&& data_array
) -> std::unique_ptr<StreamReader>;
) -> UnstructuredIrStreamReader;

[[nodiscard]] auto get_num_events_buffered() const -> size_t override;

Expand All @@ -67,12 +73,6 @@ class UnstructuredIrStreamReader : public StreamReader {
StreamReaderDataContext<four_byte_encoded_variable_t>&& stream_reader_data_context
);

// Methods
[[nodiscard]] static auto create_data_context(
std::unique_ptr<clp::streaming_compression::zstd::Decompressor>&& zstd_decompressor,
clp::Array<char>&& data_buffer
) -> StreamReaderDataContext<four_byte_encoded_variable_t>;

// Variables
std::vector<LogEventWithLevel<four_byte_encoded_variable_t>> m_encoded_log_events;
std::unique_ptr<StreamReaderDataContext<four_byte_encoded_variable_t>>
Expand All @@ -82,4 +82,4 @@ class UnstructuredIrStreamReader : public StreamReader {
};
} // namespace clp_ffi_js::ir

#endif // CLP_FFI_JS_IR_UNSTRUCTUREDUnstructuredIrStreamReader_HPP
#endif // CLP_FFI_JS_IR_UNSTRUCTUREDIRSTREAMREADER_HPP

0 comments on commit c81cd4b

Please sign in to comment.