diff --git a/CMakeLists.txt b/CMakeLists.txt index 204352ce..73ad4966 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -83,11 +83,16 @@ endif() set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp + src/clp_ffi_js/ir/StructuredIrStreamReader.cpp src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp ) set(CLP_FFI_JS_SRC_CLP_CORE src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp + src/submodules/clp/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp + src/submodules/clp/components/core/src/clp/ffi/ir_stream/utils.cpp + src/submodules/clp/components/core/src/clp/ffi/KeyValuePairLogEvent.cpp + src/submodules/clp/components/core/src/clp/ffi/SchemaTree.cpp src/submodules/clp/components/core/src/clp/ir/EncodedTextAst.cpp src/submodules/clp/components/core/src/clp/ir/LogEventDeserializer.cpp src/submodules/clp/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index c6b097cb..e7d43d19 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -1,6 +1,5 @@ #include "StreamReader.hpp" -#include #include #include #include @@ -23,6 +22,7 @@ #include #include +#include #include namespace { @@ -117,8 +117,12 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) { // JS types used as inputs emscripten::register_type("Uint8Array"); emscripten::register_type("number[] | null"); + emscripten::register_type("{timestampKey: string} | null"); // JS types used as outputs + emscripten::enum_("IrStreamType") + .value("STRUCTURED", clp_ffi_js::ir::StreamType::Structured) + .value("UNSTRUCTURED", clp_ffi_js::ir::StreamType::Unstructured); emscripten::register_type( "Array<[string, number, number, number]>" ); @@ -128,6 +132,7 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) { &clp_ffi_js::ir::StreamReader::create, emscripten::return_value_policy::take_ownership() ) + .function("getIrStreamType", &clp_ffi_js::ir::StreamReader::get_ir_stream_type) .function( "getNumEventsBuffered", &clp_ffi_js::ir::StreamReader::get_num_events_buffered @@ -143,7 +148,8 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) { } // namespace namespace clp_ffi_js::ir { -auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr { +auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options) + -> std::unique_ptr { auto const length{data_array["length"].as()}; SPDLOG_INFO("StreamReader::create: got buffer of length={}", length); @@ -159,20 +165,30 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr< rewind_reader_and_validate_encoding_type(*zstd_decompressor); - // Validate the stream's version + // Validate the stream's version and decide which type of IR stream reader to create. auto pos = zstd_decompressor->get_pos(); auto const version{get_version(*zstd_decompressor)}; - if (std::ranges::find(cUnstructuredIrVersions, version) == cUnstructuredIrVersions.end()) { - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Unsupported, - __FILENAME__, - __LINE__, - std::format("Unable to create reader for IR stream with version {}.", version) - }; - } try { - zstd_decompressor->seek_from_begin(pos); - } catch (ZstdDecompressor::OperationFailed& e) { + auto const version_validation_result{clp::ffi::ir_stream::validate_protocol_version(version) + }; + if (clp::ffi::ir_stream::IRProtocolErrorCode::Supported == version_validation_result) { + zstd_decompressor->seek_from_begin(0); + return std::make_unique(StructuredIrStreamReader::create( + std::move(zstd_decompressor), + std::move(data_buffer), + reader_options + )); + } + if (clp::ffi::ir_stream::IRProtocolErrorCode::BackwardCompatible + == version_validation_result) + { + zstd_decompressor->seek_from_begin(pos); + return std::make_unique(UnstructuredIrStreamReader::create( + std::move(zstd_decompressor), + std::move(data_buffer) + )); + } + } catch (ZstdDecompressor::OperationFailed const& e) { throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_Failure, __FILENAME__, @@ -181,8 +197,11 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr< }; } - return std::make_unique( - UnstructuredIrStreamReader::create(std::move(zstd_decompressor), std::move(data_buffer)) - ); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Unsupported, + __FILENAME__, + __LINE__, + std::format("Unable to create reader for IR stream with version {}.", version) + }; } } // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index 5f298674..06e7c094 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -1,10 +1,9 @@ #ifndef CLP_FFI_JS_IR_STREAMREADER_HPP #define CLP_FFI_JS_IR_STREAMREADER_HPP -#include #include +#include #include -#include #include #include @@ -13,13 +12,16 @@ namespace clp_ffi_js::ir { // JS types used as inputs EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(LogLevelFilterTsType); +EMSCRIPTEN_DECLARE_VAL_TYPE(ReaderOptions); // JS types used as outputs EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType); -constexpr std::array cUnstructuredIrVersions - = {"v0.0.2", "v0.0.1", "v0.0.0", "0.0.2", "0.0.1", "0.0.0"}; +enum class StreamType : uint8_t { + Structured, + Unstructured, +}; /** * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded @@ -36,7 +38,9 @@ class StreamReader { * @return The created instance. * @throw ClpFfiJsException if any error occurs. */ - [[nodiscard]] static auto create(DataArrayTsType const& data_array + [[nodiscard]] static auto create( + DataArrayTsType const& data_array, + ReaderOptions const& reader_options ) -> std::unique_ptr; // Destructor @@ -52,6 +56,8 @@ class StreamReader { auto operator=(StreamReader&&) -> StreamReader& = delete; // Methods + [[nodiscard]] virtual auto get_ir_stream_type() const -> StreamType = 0; + /** * @return The number of events buffered. */ diff --git a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp index c32ad35c..536c6b2b 100644 --- a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp +++ b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp @@ -39,11 +39,10 @@ class StreamReaderDataContext { ~StreamReaderDataContext() = default; // Methods - /** - * @return A reference to the deserializer. - */ [[nodiscard]] auto get_deserializer() -> Deserializer& { return m_deserializer; } + [[nodiscard]] auto get_reader() -> clp::ReaderInterface& { return *m_reader; } + private: clp::Array m_data_buffer; std::unique_ptr m_reader; diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp new file mode 100644 index 00000000..799da91c --- /dev/null +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp @@ -0,0 +1,195 @@ +#include "StructuredIrStreamReader.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace clp_ffi_js::ir { +namespace { +constexpr std::string_view cEmptyJsonStr{"{}"}; +constexpr std::string_view cLogLevelFilteringNotSupportedErrorMsg{ + "Log level filtering is not yet supported in this reader." +}; +constexpr std::string_view cReaderOptionsTimestampKey{"timestampKey"}; +} // namespace + +using clp::ir::four_byte_encoded_variable_t; + +auto StructuredIrStreamReader::create( + std::unique_ptr&& zstd_decompressor, + clp::Array data_array, + ReaderOptions const& reader_options +) -> StructuredIrStreamReader { + auto deserialized_log_events{std::make_shared>()}; + auto result{StructuredIrDeserializer::create( + *zstd_decompressor, + IrUnitHandler{ + deserialized_log_events, + reader_options[cReaderOptionsTimestampKey.data()].as() + } + )}; + if (result.has_error()) { + auto const error_code{result.error()}; + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + std::format( + "Failed to create deserializer: {} {}", + error_code.category().name(), + error_code.message() + ) + }; + } + StreamReaderDataContext data_context{ + std::move(data_array), + std::move(zstd_decompressor), + std::move(result.value()) + }; + return StructuredIrStreamReader{std::move(data_context), std::move(deserialized_log_events)}; +} + +auto StructuredIrStreamReader::get_num_events_buffered() const -> size_t { + return m_deserialized_log_events->size(); +} + +auto StructuredIrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { + SPDLOG_ERROR(cLogLevelFilteringNotSupportedErrorMsg); + return FilteredLogEventMapTsType{emscripten::val::null()}; +} + +void StructuredIrStreamReader::filter_log_events(LogLevelFilterTsType const& log_level_filter) { + if (log_level_filter.isNull()) { + return; + } + SPDLOG_ERROR(cLogLevelFilteringNotSupportedErrorMsg); +} + +auto StructuredIrStreamReader::deserialize_stream() -> size_t { + if (nullptr == m_stream_reader_data_context) { + return m_deserialized_log_events->size(); + } + + constexpr size_t cDefaultNumReservedLogEvents{500'000}; + m_deserialized_log_events->reserve(cDefaultNumReservedLogEvents); + auto& reader{m_stream_reader_data_context->get_reader()}; + while (true) { + auto result{m_stream_reader_data_context->get_deserializer().deserialize_next_ir_unit(reader + )}; + if (false == result.has_error()) { + continue; + } + auto const error{result.error()}; + if (std::errc::operation_not_permitted == error) { + break; + } + if (std::errc::result_out_of_range == error) { + SPDLOG_ERROR("File contains an incomplete IR stream"); + break; + } + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Corrupt, + __FILENAME__, + __LINE__, + std::format( + "Failed to deserialize IR unit: {}:{}", + error.category().name(), + error.message() + ) + }; + } + m_timestamp_node_id = m_stream_reader_data_context->get_deserializer() + .get_ir_unit_handler() + .get_timestamp_node_id(); + m_stream_reader_data_context.reset(nullptr); + return m_deserialized_log_events->size(); +} + +auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType { + if (use_filter) { + SPDLOG_ERROR(cLogLevelFilteringNotSupportedErrorMsg); + return DecodedResultsTsType{emscripten::val::null()}; + } + + if (m_deserialized_log_events->size() < end_idx || begin_idx > end_idx) { + return DecodedResultsTsType{emscripten::val::null()}; + } + + auto const results{emscripten::val::array()}; + + for (size_t log_event_idx = begin_idx; log_event_idx < end_idx; ++log_event_idx) { + auto const& log_event{m_deserialized_log_events->at(log_event_idx)}; + + auto const json_result{log_event.serialize_to_json()}; + std::string json_str{cEmptyJsonStr}; + if (false == json_result.has_value()) { + auto error_code{json_result.error()}; + SPDLOG_ERROR( + "Failed to deserialize log event to JSON: {}:{}", + error_code.category().name(), + error_code.message() + ); + } else { + json_str = json_result.value().dump(); + } + + auto const& id_value_pairs{log_event.get_node_id_value_pairs()}; + clp::ffi::value_int_t timestamp{0}; + if (m_timestamp_node_id.has_value()) { + auto const& timestamp_pair{id_value_pairs.at(m_timestamp_node_id.value())}; + if (timestamp_pair.has_value()) { + if (timestamp_pair->is()) { + timestamp = timestamp_pair.value().get_immutable_view(); + } else { + // TODO: Add support for parsing timestamp values of string type. + SPDLOG_ERROR("Unable to parse timestamp for log_event_idx={}", log_event_idx); + } + } + } + + EM_ASM( + { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, + results.as_handle(), + json_str.c_str(), + timestamp, + LogLevel::NONE, + log_event_idx + 1 + ); + } + + return DecodedResultsTsType(results); +} + +StructuredIrStreamReader::StructuredIrStreamReader( + StreamReaderDataContext&& stream_reader_data_context, + std::shared_ptr> deserialized_log_events +) + : m_deserialized_log_events{std::move(deserialized_log_events)}, + m_stream_reader_data_context{ + std::make_unique>( + std::move(stream_reader_data_context) + ) + } {} +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp new file mode 100644 index 00000000..e93dee03 --- /dev/null +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp @@ -0,0 +1,189 @@ +#ifndef CLP_FFI_JS_IR_STRUCTUREDIRSTREAMREADER_HPP +#define CLP_FFI_JS_IR_STRUCTUREDIRSTREAMREADER_HPP + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace clp_ffi_js::ir { +using schema_tree_node_id_t = std::optional; + +/** + * Class that implements the `clp::ffi::ir_stream::IrUnitHandlerInterface` to buffer log events and + * determine the schema-tree node ID of the timestamp kv-pair. + */ +class IrUnitHandler { +public: + /** + * @param deserialized_log_events The vector in which to store deserialized log events. + * @param timestamp_key Key name of schema-tree node that contains the authoritative timestamp + * for events. + */ + IrUnitHandler( + std::shared_ptr> deserialized_log_events, + std::string timestamp_key + ) + : m_timestamp_key{std::move(timestamp_key)}, + m_deserialized_log_events{std::move(deserialized_log_events)} {} + + // Methods implementing `clp::ffi::ir_stream::IrUnitHandlerInterface`. + /** + * Buffers the log event. + * @param log_event + * @return IRErrorCode::IRErrorCode_Success + */ + [[nodiscard]] auto handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event + ) -> clp::ffi::ir_stream::IRErrorCode { + m_deserialized_log_events->emplace_back(std::move(log_event)); + + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + /** + * @param utc_offset_old + * @param utc_offset_new + * @return IRErrorCode::IRErrorCode_Success + */ + [[nodiscard]] static auto handle_utc_offset_change( + [[maybe_unused]] clp::UtcOffset utc_offset_old, + [[maybe_unused]] clp::UtcOffset utc_offset_new + ) -> clp::ffi::ir_stream::IRErrorCode { + SPDLOG_WARN("UTC offset change packets aren't handled currently."); + + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + /** + * Saves the node's ID if it corresponds to events' authoritative timestamp kv-pair. + * @param schema_tree_node_locator + * @return IRErrorCode::IRErrorCode_Success + */ + [[nodiscard]] auto handle_schema_tree_node_insertion( + clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator + ) -> clp::ffi::ir_stream::IRErrorCode { + ++m_current_node_id; + + auto const& key_name{schema_tree_node_locator.get_key_name()}; + if (m_timestamp_key == key_name) { + m_timestamp_node_id.emplace(m_current_node_id); + } + + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + /** + * @return IRErrorCode::IRErrorCode_Success + */ + [[nodiscard]] static auto handle_end_of_stream() -> clp::ffi::ir_stream::IRErrorCode { + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + // Methods + /** + * @return The schema-tree node ID associated with events' authoritative timestamp key. + */ + [[nodiscard]] auto get_timestamp_node_id() const -> schema_tree_node_id_t { + return m_timestamp_node_id; + } + +private: + // Variables + std::string m_timestamp_key; + + clp::ffi::SchemaTree::Node::id_t m_current_node_id{clp::ffi::SchemaTree::cRootId}; + + schema_tree_node_id_t m_timestamp_node_id; + + // TODO: Technically, we don't need to use a `shared_ptr` since the parent stream reader will + // have a longer lifetime than this class. Instead, we could use `gsl::not_null` once we add + // `gsl` into the project. + std::shared_ptr> m_deserialized_log_events; +}; + +using StructuredIrDeserializer = clp::ffi::ir_stream::Deserializer; + +/** + * Class to deserialize and decode Zstd-compressed CLP structured IR streams, as well as format + * decoded log events. + */ +class StructuredIrStreamReader : public StreamReader { +public: + /** + * @param zstd_decompressor A decompressor for an IR stream, where the read head of the stream + * is just after the stream's encoding type. + * @param data_array The array backing `zstd_decompressor`. + * @param reader_options + * @return The created instance. + * @throw ClpFfiJsException if any error occurs. + */ + [[nodiscard]] static auto create( + std::unique_ptr&& zstd_decompressor, + clp::Array data_array, + ReaderOptions const& reader_options + ) -> StructuredIrStreamReader; + + // Destructor + ~StructuredIrStreamReader() override = default; + + // Disable copy constructor and assignment operator + StructuredIrStreamReader(StructuredIrStreamReader const&) = delete; + auto operator=(StructuredIrStreamReader const&) -> StructuredIrStreamReader& = delete; + + // Define default move constructor + StructuredIrStreamReader(StructuredIrStreamReader&&) = default; + // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. + auto operator=(StructuredIrStreamReader&&) -> StructuredIrStreamReader& = delete; + + [[nodiscard]] auto get_ir_stream_type() const -> StreamType override { + return StreamType::Structured; + } + + [[nodiscard]] auto get_num_events_buffered() const -> size_t override; + + [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override; + + void filter_log_events(LogLevelFilterTsType const& log_level_filter) override; + + /** + * @see StreamReader::deserialize_stream + * + * After the stream has been exhausted, it will be deallocated. + * + * @return @see StreamReader::deserialize_stream + */ + [[nodiscard]] auto deserialize_stream() -> size_t override; + + [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType override; + +private: + // Constructor + explicit StructuredIrStreamReader( + StreamReaderDataContext&& stream_reader_data_context, + std::shared_ptr> deserialized_log_events + ); + + // Variables + std::shared_ptr> m_deserialized_log_events; + std::unique_ptr> m_stream_reader_data_context; + + schema_tree_node_id_t m_timestamp_node_id; +}; +} // namespace clp_ffi_js::ir + +#endif // CLP_FFI_JS_IR_STRUCTUREDIRSTREAMREADER_HPP diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp index 709e9a26..b4f22107 100644 --- a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp @@ -56,6 +56,10 @@ class UnstructuredIrStreamReader : public StreamReader { clp::Array data_array ) -> UnstructuredIrStreamReader; + [[nodiscard]] auto get_ir_stream_type() const -> StreamType override { + return StreamType::Unstructured; + } + [[nodiscard]] auto get_num_events_buffered() const -> size_t override; [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override; diff --git a/src/submodules/clp b/src/submodules/clp index e1f3f2ab..31de766e 160000 --- a/src/submodules/clp +++ b/src/submodules/clp @@ -1 +1 @@ -Subproject commit e1f3f2abe3473324b19d66e22c182ec3ac0d408f +Subproject commit 31de766ecc3175b1fa472d12881587e3673294de