diff --git a/CMakeLists.txt b/CMakeLists.txt index 204352ce..73ad4966 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -83,11 +83,16 @@ endif() set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp + src/clp_ffi_js/ir/StructuredIrStreamReader.cpp src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp ) set(CLP_FFI_JS_SRC_CLP_CORE src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp + src/submodules/clp/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp + src/submodules/clp/components/core/src/clp/ffi/ir_stream/utils.cpp + src/submodules/clp/components/core/src/clp/ffi/KeyValuePairLogEvent.cpp + src/submodules/clp/components/core/src/clp/ffi/SchemaTree.cpp src/submodules/clp/components/core/src/clp/ir/EncodedTextAst.cpp src/submodules/clp/components/core/src/clp/ir/LogEventDeserializer.cpp src/submodules/clp/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index c6b097cb..98e5cb84 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -23,6 +23,7 @@ #include #include +#include #include namespace { @@ -117,6 +118,9 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) { // JS types used as inputs emscripten::register_type("Uint8Array"); emscripten::register_type("number[] | null"); + emscripten::register_type( + "interface{logLevelKey: string, timestampKey: string} | null" + ); // JS types used as outputs emscripten::register_type( @@ -143,7 +147,8 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) { } // namespace namespace clp_ffi_js::ir { -auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr { +auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options) + -> std::unique_ptr { auto const length{data_array["length"].as()}; SPDLOG_INFO("StreamReader::create: got buffer of length={}", length); @@ -162,16 +167,28 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr< // Validate the stream's version auto pos = zstd_decompressor->get_pos(); auto const version{get_version(*zstd_decompressor)}; - if (std::ranges::find(cUnstructuredIrVersions, version) == cUnstructuredIrVersions.end()) { - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Unsupported, - __FILENAME__, - __LINE__, - std::format("Unable to create reader for IR stream with version {}.", version) - }; + if (std::ranges::find(cUnstructuredIrVersions, version) != cUnstructuredIrVersions.end()) { + try { + zstd_decompressor->seek_from_begin(pos); + } catch (ZstdDecompressor::OperationFailed& e) { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + std::format("Unable to rewind zstd decompressor: {}", e.what()) + }; + } + return std::make_unique(UnstructuredIrStreamReader::create( + std::move(zstd_decompressor), + std::move(data_buffer) + )); } + // if (clp::ffi::ir_stream::IRProtocolErrorCode_Supported + // == clp::ffi::ir_stream::validate_protocol_version(version)) + // { + // FIXME: wait for https://github.com/y-scope/clp/pull/573 try { - zstd_decompressor->seek_from_begin(pos); + zstd_decompressor->seek_from_begin(0); } catch (ZstdDecompressor::OperationFailed& e) { throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_Failure, @@ -180,9 +197,18 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr< std::format("Unable to rewind zstd decompressor: {}", e.what()) }; } - - return std::make_unique( - UnstructuredIrStreamReader::create(std::move(zstd_decompressor), std::move(data_buffer)) - ); + return std::make_unique(StructuredIrStreamReader::create( + std::move(zstd_decompressor), + std::move(data_buffer), + reader_options + )); + // } + + // throw ClpFfiJsException{ + // clp::ErrorCode::ErrorCode_Unsupported, + // __FILENAME__, + // __LINE__, + // std::format("Unable to create reader for IR stream with version {}.", version) + // }; } } // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index 5f298674..111b110f 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -13,6 +13,7 @@ namespace clp_ffi_js::ir { // JS types used as inputs EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(LogLevelFilterTsType); +EMSCRIPTEN_DECLARE_VAL_TYPE(ReaderOptions); // JS types used as outputs EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); @@ -36,7 +37,9 @@ class StreamReader { * @return The created instance. * @throw ClpFfiJsException if any error occurs. */ - [[nodiscard]] static auto create(DataArrayTsType const& data_array + [[nodiscard]] static auto create( + DataArrayTsType const& data_array, + ReaderOptions const& reader_options ) -> std::unique_ptr; // Destructor diff --git a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp index c32ad35c..b28cead4 100644 --- a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp +++ b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp @@ -44,6 +44,11 @@ class StreamReaderDataContext { */ [[nodiscard]] auto get_deserializer() -> Deserializer& { return m_deserializer; } + /** + * @return A reference to the reader. + */ + [[nodiscard]] auto get_reader() -> clp::ReaderInterface& { return *m_reader; } + private: clp::Array m_data_buffer; std::unique_ptr m_reader; diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp new file mode 100644 index 00000000..3d52d48a --- /dev/null +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp @@ -0,0 +1,195 @@ +#include "StructuredIrStreamReader.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace clp_ffi_js::ir { + +using namespace std::literals::string_literals; +using clp::ir::four_byte_encoded_variable_t; + +constexpr std::string_view cLogLevelFilteringNotSupportedPrompt{ + "Log level filtering is not yet supported in this reader." +}; + +auto StructuredIrStreamReader::create( + std::unique_ptr&& zstd_decompressor, + clp::Array data_array, + ReaderOptions const& reader_options +) -> StructuredIrStreamReader { + auto deserialized_log_events{std::make_shared>()}; + auto result{StructuredIrDeserializer::create( + *zstd_decompressor, + IrUnitHandler( + deserialized_log_events, + reader_options["logLevelKey"].as(), + reader_options["timestampKey"].as() + ) + )}; + if (result.has_error()) { + auto const error_code{result.error()}; + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + std::format( + "Failed to create deserializer: {} {}", + error_code.category().name(), + error_code.message() + ) + }; + } + auto data_context = StreamReaderDataContext( + std::move(data_array), + std::move(zstd_decompressor), + std::move(result.value()) + ); + return StructuredIrStreamReader(std::move(data_context), std::move(deserialized_log_events)); +} + +auto StructuredIrStreamReader::get_num_events_buffered() const -> size_t { + return m_deserialized_log_events->size(); +} + +auto StructuredIrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { + SPDLOG_ERROR(cLogLevelFilteringNotSupportedPrompt); + return FilteredLogEventMapTsType{emscripten::val::null()}; +} + +void StructuredIrStreamReader::filter_log_events(LogLevelFilterTsType const& log_level_filter) { + if (log_level_filter.isNull()) { + return; + } + SPDLOG_ERROR(cLogLevelFilteringNotSupportedPrompt); +} + +auto StructuredIrStreamReader::deserialize_stream() -> size_t { + if (nullptr == m_stream_reader_data_context) { + return m_deserialized_log_events->size(); + } + + constexpr size_t cDefaultNumReservedLogEvents{500'000}; + m_deserialized_log_events->reserve(cDefaultNumReservedLogEvents); + auto& reader{m_stream_reader_data_context->get_reader()}; + while (true) { + auto result{m_stream_reader_data_context->get_deserializer().deserialize_next_ir_unit(reader + )}; + if (false == result.has_error()) { + continue; + } + auto const error{result.error()}; + if (std::errc::no_message_available == error || std::errc::operation_not_permitted == error) + { + break; + } + if (std::errc::result_out_of_range == error) { + SPDLOG_ERROR("File contains an incomplete IR stream"); + break; + } + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Corrupt, + __FILENAME__, + __LINE__, + std::format( + "Failed to deserialize: {}:{}", + error.category().name(), + error.message() + ) + }; + } + m_level_node_id = m_stream_reader_data_context->get_deserializer() + .get_ir_unit_handler() + .get_level_node_id(); + m_timestamp_node_id = m_stream_reader_data_context->get_deserializer() + .get_ir_unit_handler() + .get_timestamp_node_id(); + m_stream_reader_data_context.reset(nullptr); + return m_deserialized_log_events->size(); +} + +auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType { + if (use_filter) { + SPDLOG_ERROR(cLogLevelFilteringNotSupportedPrompt); + return DecodedResultsTsType{emscripten::val::null()}; + } + + if (m_deserialized_log_events->size() < end_idx || begin_idx > end_idx) { + return DecodedResultsTsType{emscripten::val::null()}; + } + + std::string message; + constexpr size_t cDefaultReservedMessageLength{512}; + message.reserve(cDefaultReservedMessageLength); + auto const results{emscripten::val::array()}; + + for (size_t log_event_idx = begin_idx; log_event_idx < end_idx; ++log_event_idx) { + auto const& log_event{m_deserialized_log_events->at(log_event_idx)}; + + auto const json{log_event.serialize_to_json()}; + if (false == json.has_value()) { + SPDLOG_ERROR("Failed to decode message."); + break; + } + + auto const& id_value_pairs{log_event.get_node_id_value_pairs()}; + clp::ffi::value_int_t log_level{static_cast(LogLevel::NONE)}; + if (m_level_node_id.has_value()) { + auto const& log_level_pair{id_value_pairs.at(m_level_node_id.value())}; + log_level = log_level_pair.has_value() + ? log_level_pair.value().get_immutable_view() + : static_cast(LogLevel::NONE); + } + clp::ffi::value_int_t timestamp{0}; + if (m_timestamp_node_id.has_value()) { + auto const& timestamp_pair{id_value_pairs.at(m_timestamp_node_id.value())}; + timestamp = timestamp_pair.has_value() + ? timestamp_pair.value().get_immutable_view() + : 0; + } + + EM_ASM( + { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, + results.as_handle(), + json.value().dump().c_str(), + log_level, + timestamp, + log_event_idx + 1 + ); + } + + return DecodedResultsTsType(results); +} + +StructuredIrStreamReader::StructuredIrStreamReader( + StreamReaderDataContext&& stream_reader_data_context, + std::shared_ptr> deserialized_log_events +) + : m_stream_reader_data_context{std::make_unique< + StreamReaderDataContext>( + std::move(stream_reader_data_context) + )}, + m_deserialized_log_events{std::move(deserialized_log_events)} {} +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp new file mode 100644 index 00000000..cd0df16d --- /dev/null +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp @@ -0,0 +1,164 @@ +#ifndef CLP_FFI_JS_IR_STRUCTUREDIRSTREAMREADER_HPP +#define CLP_FFI_JS_IR_STRUCTUREDIRSTREAMREADER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +namespace clp_ffi_js::ir { +using parsed_tree_node_id_t = std::optional; + +/** + * Class to handle deserialized IR units. + */ +class IrUnitHandler { +public: + IrUnitHandler( + std::shared_ptr> deserialized_log_events, + std::string log_level_key, + std::string timestamp_key + ) + : m_deserialized_log_events{std::move(deserialized_log_events)}, + m_log_level_key{std::move(log_level_key)}, + m_timestamp_key{std::move(timestamp_key)} {} + + // Implements `clp::ffi::ir_stream::IrUnitHandlerInterface` interface + [[nodiscard]] auto handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event + ) -> clp::ffi::ir_stream::IRErrorCode { + m_deserialized_log_events->emplace_back(std::move(log_event)); + + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] static auto handle_utc_offset_change( + [[maybe_unused]] clp::UtcOffset utc_offset_old, + [[maybe_unused]] clp::UtcOffset utc_offset_new + ) -> clp::ffi::ir_stream::IRErrorCode { + SPDLOG_WARN("UTC offset change packets are currently not handled."); + + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] auto handle_schema_tree_node_insertion( + [[maybe_unused]] clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator + ) -> clp::ffi::ir_stream::IRErrorCode { + ++m_current_node_id; + + auto const& key_name{schema_tree_node_locator.get_key_name()}; + if (m_log_level_key == key_name) { + m_level_node_id.emplace(m_current_node_id); + } else if (m_timestamp_key == key_name) { + m_timestamp_node_id.emplace(m_current_node_id); + } + + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] static auto handle_end_of_stream() -> clp::ffi::ir_stream::IRErrorCode { + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + // Methods + [[nodiscard]] auto get_level_node_id() const -> parsed_tree_node_id_t { + return m_level_node_id; + } + + [[nodiscard]] auto get_timestamp_node_id() const -> parsed_tree_node_id_t { + return m_timestamp_node_id; + } + +private: + std::string m_log_level_key; + std::string m_timestamp_key; + + clp::ffi::SchemaTree::Node::id_t m_current_node_id{clp::ffi::SchemaTree::cRootId}; + parsed_tree_node_id_t m_level_node_id; + parsed_tree_node_id_t m_timestamp_node_id; + + std::shared_ptr> m_deserialized_log_events; + bool m_is_complete{false}; +}; + +using StructuredIrDeserializer = clp::ffi::ir_stream::Deserializer; + +/** + * Class to deserialize and decode Zstd-compressed CLP structured IR streams, as well as format + * decoded log events. + */ +class StructuredIrStreamReader : public StreamReader { +public: + // Destructor + ~StructuredIrStreamReader() override = default; + + // Disable copy constructor and assignment operator + StructuredIrStreamReader(StructuredIrStreamReader const&) = delete; + auto operator=(StructuredIrStreamReader const&) -> StructuredIrStreamReader& = delete; + + // Define default move constructor + StructuredIrStreamReader(StructuredIrStreamReader&&) = default; + // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. + auto operator=(StructuredIrStreamReader&&) -> StructuredIrStreamReader& = delete; + + /** + * @param zstd_decompressor A decompressor for an IR stream, where the read head of the stream + * is just after the stream's encoding type. + * @param data_array The array backing `zstd_decompressor`. + * @return The created instance. + * @throw ClpFfiJsException if any error occurs. + */ + [[nodiscard]] static auto create( + std::unique_ptr&& zstd_decompressor, + clp::Array data_array, + ReaderOptions const& reader_options + ) -> StructuredIrStreamReader; + + [[nodiscard]] auto get_num_events_buffered() const -> size_t override; + + [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override; + + void filter_log_events(LogLevelFilterTsType const& log_level_filter) override; + + /** + * @see StreamReader::deserialize_stream + * + * After the stream has been exhausted, it will be deallocated. + * + * @return @see StreamReader::deserialize_stream + */ + [[nodiscard]] auto deserialize_stream() -> size_t override; + + [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType override; + +private: + // Constructor + explicit StructuredIrStreamReader( + StreamReaderDataContext&& stream_reader_data_context, + std::shared_ptr> deserialized_log_events + ); + + // Variables + std::shared_ptr> m_deserialized_log_events; + std::unique_ptr> m_stream_reader_data_context; + + parsed_tree_node_id_t m_level_node_id; + parsed_tree_node_id_t m_timestamp_node_id; +}; +} // namespace clp_ffi_js::ir + +#endif // CLP_FFI_JS_IR_STRUCTUREDIRSTREAMREADER_HPP diff --git a/src/submodules/clp b/src/submodules/clp index e1f3f2ab..bf3d1973 160000 --- a/src/submodules/clp +++ b/src/submodules/clp @@ -1 +1 @@ -Subproject commit e1f3f2abe3473324b19d66e22c182ec3ac0d408f +Subproject commit bf3d1973750fbd6f21f1d7d7bc292278dc4a8c8a