Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StructuredIrStreamReader. #29

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,16 @@ endif()

set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/StreamReader.cpp
src/clp_ffi_js/ir/StructuredIrStreamReader.cpp
src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp
)

set(CLP_FFI_JS_SRC_CLP_CORE
src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/utils.cpp
src/submodules/clp/components/core/src/clp/ffi/KeyValuePairLogEvent.cpp
src/submodules/clp/components/core/src/clp/ffi/SchemaTree.cpp
src/submodules/clp/components/core/src/clp/ir/EncodedTextAst.cpp
src/submodules/clp/components/core/src/clp/ir/LogEventDeserializer.cpp
src/submodules/clp/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp
Expand Down
52 changes: 39 additions & 13 deletions src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/ir/UnstructuredIrStreamReader.hpp>
#include <clp_ffi_js/ir/StructuredIrStreamReader.hpp>

namespace {
using ClpFfiJsException = clp_ffi_js::ClpFfiJsException;
Expand Down Expand Up @@ -117,6 +118,9 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) {
// JS types used as inputs
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::LogLevelFilterTsType>("number[] | null");
emscripten::register_type<clp_ffi_js::ir::ReaderOptions>(
"interface{logLevelKey: string, timestampKey: string} | null"
);

// JS types used as outputs
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
Expand All @@ -143,7 +147,8 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) {
} // namespace

namespace clp_ffi_js::ir {
auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<StreamReader> {
auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options)
-> std::unique_ptr<StreamReader> {
auto const length{data_array["length"].as<size_t>()};
SPDLOG_INFO("StreamReader::create: got buffer of length={}", length);

Expand All @@ -162,16 +167,28 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<
// Validate the stream's version
auto pos = zstd_decompressor->get_pos();
auto const version{get_version(*zstd_decompressor)};
if (std::ranges::find(cUnstructuredIrVersions, version) == cUnstructuredIrVersions.end()) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
std::format("Unable to create reader for IR stream with version {}.", version)
};
if (std::ranges::find(cUnstructuredIrVersions, version) != cUnstructuredIrVersions.end()) {
try {
zstd_decompressor->seek_from_begin(pos);
} catch (ZstdDecompressor::OperationFailed& e) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format("Unable to rewind zstd decompressor: {}", e.what())
};
}
return std::make_unique<UnstructuredIrStreamReader>(UnstructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer)
));
}
// if (clp::ffi::ir_stream::IRProtocolErrorCode_Supported
// == clp::ffi::ir_stream::validate_protocol_version(version))
// {
// FIXME: wait for https://github.com/y-scope/clp/pull/573
try {
zstd_decompressor->seek_from_begin(pos);
zstd_decompressor->seek_from_begin(0);
} catch (ZstdDecompressor::OperationFailed& e) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
Expand All @@ -180,9 +197,18 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<
std::format("Unable to rewind zstd decompressor: {}", e.what())
};
}

return std::make_unique<UnstructuredIrStreamReader>(
UnstructuredIrStreamReader::create(std::move(zstd_decompressor), std::move(data_buffer))
);
return std::make_unique<StructuredIrStreamReader>(StructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer),
reader_options
));
// }

// throw ClpFfiJsException{
// clp::ErrorCode::ErrorCode_Unsupported,
// __FILENAME__,
// __LINE__,
// std::format("Unable to create reader for IR stream with version {}.", version)
// };
}
} // namespace clp_ffi_js::ir
4 changes: 3 additions & 1 deletion src/clp_ffi_js/ir/StreamReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace clp_ffi_js::ir {
// JS types used as inputs
EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(LogLevelFilterTsType);
EMSCRIPTEN_DECLARE_VAL_TYPE(ReaderOptions);

// JS types used as outputs
EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType);
Expand All @@ -36,7 +37,8 @@ class StreamReader {
* @return The created instance.
* @throw ClpFfiJsException if any error occurs.
*/
[[nodiscard]] static auto create(DataArrayTsType const& data_array
[[nodiscard]] static auto create(DataArrayTsType const& data_array,
ReaderOptions const& reader_options
) -> std::unique_ptr<StreamReader>;

// Destructor
Expand Down
4 changes: 4 additions & 0 deletions src/clp_ffi_js/ir/StreamReaderDataContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class StreamReaderDataContext {
*/
[[nodiscard]] auto get_deserializer() -> Deserializer& { return m_deserializer; }

/**
* @return A reference to the reader.
*/
[[nodiscard]] auto get_reader() -> clp::ReaderInterface& { return *m_reader; }
private:
clp::Array<char> m_data_buffer;
std::unique_ptr<clp::ReaderInterface> m_reader;
Expand Down
169 changes: 169 additions & 0 deletions src/clp_ffi_js/ir/StructuredIrStreamReader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#ifndef CLP_FFI_JS_IR_STRUCTUREDIRSTREAMREADER_HPP
#define CLP_FFI_JS_IR_STRUCTUREDIRSTREAMREADER_HPP

#include <Array.hpp>
#include <cstddef>
#include <memory>
#include <optional>
#include <vector>

#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ir/LogEventDeserializer.hpp>
#include <clp/ir/types.hpp>
#include <clp/TimestampPattern.hpp>
#include <emscripten/val.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ir/LogEventWithLevel.hpp>
#include <clp_ffi_js/ir/StreamReader.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

namespace clp_ffi_js::ir {
using parsed_tree_node_id_t = std::optional<clp::ffi::SchemaTree::Node::id_t>;

/**
* Class to handle deserialized IR units.
*/
class IrUnitHandler {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to move this class into a separate header? That way we don't have to write using StructuredIrDeserializer = clp::ffi::ir_stream::Deserializer<IrUnitHandler>; after this class.

public:
IrUnitHandler(
std::vector<clp::ffi::KeyValuePairLogEvent>& deserialized_log_events,
std::string log_level_key,
std::string timestamp_key
)
: m_deserialized_log_events{deserialized_log_events},
m_log_level_key{std::move(log_level_key)},
m_timestamp_key{std::move(timestamp_key)} {}

// Implements `clp::ffi::ir_stream::IrUnitHandlerInterface` interface
[[nodiscard]] auto handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event
) -> clp::ffi::ir_stream::IRErrorCode {
m_deserialized_log_events.emplace_back(std::move(log_event));

return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success;
}

[[nodiscard]] static auto handle_utc_offset_change(
[[maybe_unused]] clp::UtcOffset utc_offset_old,
[[maybe_unused]] clp::UtcOffset utc_offset_new
) -> clp::ffi::ir_stream::IRErrorCode {
SPDLOG_WARN("UTC offset change packets are currently not handled.");

return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success;
}

[[nodiscard]] auto handle_schema_tree_node_insertion(
[[maybe_unused]] clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator
) -> clp::ffi::ir_stream::IRErrorCode {
++m_current_node_id;
auto const& key_name{schema_tree_node_locator.get_key_name()};

if (m_log_level_key == key_name) {
m_level_node_id.emplace(m_current_node_id);
} else if (m_timestamp_key == key_name) {
m_timestamp_node_id.emplace(m_current_node_id);
}

return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success;
}

// FIXME: do i need this?
[[nodiscard]] static auto handle_end_of_stream() -> clp::ffi::ir_stream::IRErrorCode {
return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success;
}

// Methods
[[nodiscard]] auto get_deserialized_log_events(
) const -> std::vector<clp::ffi::KeyValuePairLogEvent> const& {
return m_deserialized_log_events;
}

[[nodiscard]] auto get_level_node_id() const -> parsed_tree_node_id_t {
return m_level_node_id;
}

[[nodiscard]] auto get_timestamp_node_id() const -> parsed_tree_node_id_t {
return m_timestamp_node_id;
}

private:
std::string m_log_level_key;
std::string m_timestamp_key;

// the root node has id=0
clp::ffi::SchemaTree::Node::id_t m_current_node_id;
parsed_tree_node_id_t m_level_node_id;
parsed_tree_node_id_t m_timestamp_node_id;

std::vector<clp::ffi::KeyValuePairLogEvent>& m_deserialized_log_events;
bool m_is_complete{false};
};

using StructuredIrDeserializer = clp::ffi::ir_stream::Deserializer<IrUnitHandler>;

/**
* Class to deserialize and decode Zstd-compressed CLP structured IR streams, as well as format
* decoded log events.
*/
class StructuredIrStreamReader : public StreamReader {
public:
// Destructor
~StructuredIrStreamReader() override = default;

// Disable copy constructor and assignment operator
StructuredIrStreamReader(StructuredIrStreamReader const&) = delete;
auto operator=(StructuredIrStreamReader const&) -> StructuredIrStreamReader& = delete;

// Define default move constructor
StructuredIrStreamReader(StructuredIrStreamReader&&) = default;
// Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`.
auto operator=(StructuredIrStreamReader&&) -> StructuredIrStreamReader& = delete;

/**
* @param zstd_decompressor A decompressor for an IR stream, where the read head of the stream
* is just after the stream's encoding type.
* @param data_array The array backing `zstd_decompressor`.
* @return The created instance.
* @throw ClpFfiJsException if any error occurs.
*/
[[nodiscard]] static auto create(
std::unique_ptr<ZstdDecompressor>&& zstd_decompressor,
clp::Array<char> data_array,
ReaderOptions const& reader_options
) -> StructuredIrStreamReader;

[[nodiscard]] auto get_num_events_buffered() const -> size_t override;

[[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override;

void filter_log_events(LogLevelFilterTsType const& log_level_filter) override;

/**
* @see StreamReader::deserialize_stream
*
* After the stream has been exhausted, it will be deallocated.
*
* @return @see StreamReader::deserialize_stream
*/
[[nodiscard]] auto deserialize_stream() -> size_t override;

[[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType override;

private:
// Constructor
explicit StructuredIrStreamReader(
StreamReaderDataContext<StructuredIrDeserializer>&& stream_reader_data_context,
std::shared_ptr<std::vector<clp::ffi::KeyValuePairLogEvent>> deserialized_log_events
);

// Variables
std::shared_ptr<std::vector<clp::ffi::KeyValuePairLogEvent>> m_deserialized_log_events;
std::unique_ptr<StreamReaderDataContext<StructuredIrDeserializer>> m_stream_reader_data_context;

parsed_tree_node_id_t m_level_node_id;
parsed_tree_node_id_t m_timestamp_node_id;
};
} // namespace clp_ffi_js::ir

#endif // CLP_FFI_JS_IR_STRUCTUREDIRSTREAMREADER_HPP
2 changes: 1 addition & 1 deletion src/submodules/clp
Submodule clp updated from e1f3f2 to ee0ade
Loading