diff --git a/CMakeLists.txt b/CMakeLists.txt index bffcbcdd..53adc553 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -89,6 +89,7 @@ target_link_options( -sEXPORT_ES6 -sMODULARIZE -sWASM_BIGINT + -fwasm-exceptions --emit-tsd ${CLP_FFI_JS_BIN_NAME}.d.ts ) target_link_libraries(${CLP_FFI_JS_BIN_NAME} PRIVATE embind) @@ -112,10 +113,19 @@ target_include_directories( target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/) -set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp) +set(CLP_FFI_JS_SRC_MAIN + src/clp_ffi_js/bindings.cpp + src/clp_ffi_js/ir/KVPairIRStreamReader.cpp + src/clp_ffi_js/ir/IRStreamReader.cpp + src/clp_ffi_js/ir/StreamReader.cpp +) set(CLP_FFI_JS_SRC_CLP_CORE src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp + src/submodules/clp/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp + src/submodules/clp/components/core/src/clp/ffi/ir_stream/utils.cpp + src/submodules/clp/components/core/src/clp/ffi/KeyValuePairLogEvent.cpp + src/submodules/clp/components/core/src/clp/ffi/SchemaTree.cpp src/submodules/clp/components/core/src/clp/ir/EncodedTextAst.cpp src/submodules/clp/components/core/src/clp/ir/LogEventDeserializer.cpp src/submodules/clp/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp diff --git a/src/clp_ffi_js/bindings.cpp b/src/clp_ffi_js/bindings.cpp new file mode 100644 index 00000000..f39ded8f --- /dev/null +++ b/src/clp_ffi_js/bindings.cpp @@ -0,0 +1,60 @@ +#include +#include "clp_ffi_js/ir/StreamReader.hpp" +#include "clp_ffi_js/ir/KVPairIRStreamReader.hpp" +#include "clp_ffi_js/ir/IRStreamReader.hpp" + +namespace { + EMSCRIPTEN_BINDINGS(ClpIrStreamReader) { + emscripten::register_type("Uint8Array"); + emscripten::register_type( + "Array<[string, number, number, number]>" + ); + emscripten::register_type("number[] | null"); + emscripten::register_type("interface{logLevelKey?: string, timestampKey?: string}"); + + emscripten::class_>("ClpIrStreamReader") + .constructor( + &clp_ffi_js::ir::IRStreamReader::create, + emscripten::return_value_policy::take_ownership() + ) + .function( + "getNumEventsBuffered", + &clp_ffi_js::ir::IRStreamReader::get_num_events_buffered + ) + .function( + "getFilteredLogEventMap", + &clp_ffi_js::ir::IRStreamReader::get_filtered_log_event_map + ) + .function("filterLogEvents", &clp_ffi_js::ir::IRStreamReader::filter_log_events) + .function("deserializeStream", &clp_ffi_js::ir::IRStreamReader::deserialize_stream) + .function("decodeRange", &clp_ffi_js::ir::IRStreamReader::decode_range); + + emscripten::class_>("ClpKVPairIRStreamReader") + .constructor( + &clp_ffi_js::ir::KVPairIRStreamReader::create, + emscripten::return_value_policy::take_ownership() + ) + .function( + "getNumEventsBuffered", + &clp_ffi_js::ir::KVPairIRStreamReader::get_num_events_buffered + ) + .function( + "getFilteredLogEventMap", + &clp_ffi_js::ir::KVPairIRStreamReader::get_filtered_log_event_map + ) + .function("filterLogEvents", &clp_ffi_js::ir::KVPairIRStreamReader::filter_log_events) + .function( + "deserializeStream", + &clp_ffi_js::ir::KVPairIRStreamReader::deserialize_stream + ) + .function("decodeRange", &clp_ffi_js::ir::KVPairIRStreamReader::decode_range); + + emscripten::class_("ClpStreamReader") + .constructor( + &clp_ffi_js::ir::StreamReader::create, + emscripten::return_value_policy::take_ownership() + ); + } +} // namespace diff --git a/src/clp_ffi_js/ir/IRStreamReader.cpp b/src/clp_ffi_js/ir/IRStreamReader.cpp new file mode 100644 index 00000000..bb8ee5ce --- /dev/null +++ b/src/clp_ffi_js/ir/IRStreamReader.cpp @@ -0,0 +1,253 @@ +#include "IRStreamReader.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +using namespace std::literals::string_literals; +using clp::ir::four_byte_encoded_variable_t; + +namespace clp_ffi_js::ir { +auto IRStreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options) -> IRStreamReader { + auto const length{data_array["length"].as()}; + SPDLOG_INFO("IRStreamReader::create: got buffer of length={}", length); + + // Copy array from JavaScript to C++ + clp::Array data_buffer{length}; + // NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast) + emscripten::val::module_property("HEAPU8") + .call("set", data_array, reinterpret_cast(data_buffer.data())); + // NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast) + + auto zstd_decompressor{std::make_unique()}; + zstd_decompressor->open(data_buffer.data(), length); + + bool is_four_bytes_encoding{true}; + if (auto const err{ + clp::ffi::ir_stream::get_encoding_type(*zstd_decompressor, is_four_bytes_encoding) + }; + clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err) + { + SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_MetadataCorrupted, + __FILENAME__, + __LINE__, + "Failed to decode encoding type." + }; + } + if (false == is_four_bytes_encoding) { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Unsupported, + __FILENAME__, + __LINE__, + "IR stream uses unsupported encoding." + }; + } + + auto result{ + clp::ir::LogEventDeserializer::create(*zstd_decompressor) + }; + if (result.has_error()) { + auto const error_code{result.error()}; + SPDLOG_CRITICAL( + "Failed to create deserializer: {}:{}", + error_code.category().name(), + error_code.message() + ); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + "Failed to create deserializer" + }; + } + + StreamReaderDataContext> stream_reader_data_context{ + std::move(data_buffer), + std::move(zstd_decompressor), + std::move(result.value()) + }; + return IRStreamReader{std::move(stream_reader_data_context), std::move(reader_options)}; +} + +auto IRStreamReader::get_num_events_buffered() const -> size_t { + return m_encoded_log_events.size(); +} + +auto IRStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { + if (false == m_filtered_log_event_map.has_value()) { + return FilteredLogEventMapTsType{emscripten::val::null()}; + } + + return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; +} + +void IRStreamReader::filter_log_events(emscripten::val const& log_level_filter) { + if (log_level_filter.isNull()) { + m_filtered_log_event_map.reset(); + return; + } + + m_filtered_log_event_map.emplace(); + auto filter_levels{emscripten::vecFromJSArray>(log_level_filter + )}; + for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) { + auto const& log_event = m_encoded_log_events[log_event_idx]; + if (std::ranges::find( + filter_levels, + clp::enum_to_underlying_type(log_event.get_log_level()) + ) + != filter_levels.end()) + { + m_filtered_log_event_map->emplace_back(log_event_idx); + } + } +} + +auto IRStreamReader::deserialize_stream() -> size_t { + if (nullptr == m_stream_reader_data_context) { + return m_encoded_log_events.size(); + } + + constexpr size_t cDefaultNumReservedLogEvents{500'000}; + m_encoded_log_events.reserve(cDefaultNumReservedLogEvents); + + while (true) { + auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()}; + if (result.has_error()) { + auto const error{result.error()}; + if (std::errc::no_message_available == error) { + break; + } + if (std::errc::result_out_of_range == error) { + SPDLOG_ERROR("File contains an incomplete IR stream"); + break; + } + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Corrupt, + __FILENAME__, + __LINE__, + "Failed to deserialize: "s + error.category().name() + ":" + error.message() + }; + } + auto const& log_event = result.value(); + auto const& message = log_event.get_message(); + + auto const& logtype = message.get_logtype(); + constexpr size_t cLogLevelPositionInMessages{1}; + LogLevel log_level{LogLevel::NONE}; + if (logtype.length() > cLogLevelPositionInMessages) { + // NOLINTNEXTLINE(readability-qualified-auto) + auto const log_level_name_it{std::find_if( + cLogLevelNames.begin() + static_cast(cValidLogLevelsBeginIdx), + cLogLevelNames.end(), + [&](std::string_view level) { + return logtype.substr(cLogLevelPositionInMessages).starts_with(level); + } + )}; + if (log_level_name_it != cLogLevelNames.end()) { + log_level = static_cast( + std::distance(cLogLevelNames.begin(), log_level_name_it) + ); + } + } + + auto log_viewer_event{LogEventWithLevel>( + std::move(log_event), + log_level + )}; + m_encoded_log_events.emplace_back(std::move(log_viewer_event)); + } + m_stream_reader_data_context.reset(nullptr); + return m_encoded_log_events.size(); +} + +auto IRStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType { + if (use_filter && false == m_filtered_log_event_map.has_value()) { + return DecodedResultsTsType{emscripten::val::null()}; + } + + size_t length{0}; + if (use_filter) { + length = m_filtered_log_event_map->size(); + } else { + length = m_encoded_log_events.size(); + } + if (length < end_idx || begin_idx > end_idx) { + return DecodedResultsTsType{emscripten::val::null()}; + } + + std::string message; + constexpr size_t cDefaultReservedMessageLength{512}; + message.reserve(cDefaultReservedMessageLength); + auto const results{emscripten::val::array()}; + + for (size_t i = begin_idx; i < end_idx; ++i) { + size_t log_event_idx{0}; + if (use_filter) { + log_event_idx = m_filtered_log_event_map->at(i); + } else { + log_event_idx = i; + } + auto const& log_event_with_level{m_encoded_log_events[log_event_idx]}; + auto const& log_event{log_event_with_level.get_log_event()}; + + auto const parsed{log_event.get_message().decode_and_unparse()}; + if (false == parsed.has_value()) { + SPDLOG_ERROR("Failed to decode message."); + break; + } + message = parsed.value(); + + m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message); + + EM_ASM( + { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, + results.as_handle(), + message.c_str(), + log_event.get_timestamp(), + log_event_with_level.get_log_level(), + log_event_idx + 1 + ); + } + + return DecodedResultsTsType(results); +} + + IRStreamReader::IRStreamReader( + StreamReaderDataContext>&& stream_reader_data_context, ReaderOptions const& reader_options +) + : m_stream_reader_data_context{std::make_unique< + StreamReaderDataContext>>( + std::move(stream_reader_data_context) + )}, + m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/IRStreamReader.hpp b/src/clp_ffi_js/ir/IRStreamReader.hpp new file mode 100644 index 00000000..dd37135f --- /dev/null +++ b/src/clp_ffi_js/ir/IRStreamReader.hpp @@ -0,0 +1,104 @@ +#ifndef CLP_FFI_JS_IR_IR_STREAM_READER_HPP +#define CLP_FFI_JS_IR_IR_STREAM_READER_HPP + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace clp_ffi_js::ir { +/** + * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded + * log events. + */ +class IRStreamReader: public StreamReader { +public: + /** + * Creates a IRStreamReader to read from the given array. + * + * @param data_array An array containing a Zstandard-compressed IR stream. + * @return The created instance. + * @throw ClpFfiJsException if any error occurs. + */ + [[nodiscard]] static auto create(DataArrayTsType const& data_array, ReaderOptions const& reader_options) -> IRStreamReader; + + // Destructor + ~IRStreamReader() = default; + + // Disable copy constructor and assignment operator + IRStreamReader(IRStreamReader const&) = delete; + auto operator=(IRStreamReader const&) -> IRStreamReader& = delete; + + // Define default move constructor + IRStreamReader(IRStreamReader&&) = default; + // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. + auto operator=(IRStreamReader&&) -> IRStreamReader& = delete; + + /** + * @return The number of events buffered. + */ + [[nodiscard]] auto get_num_events_buffered() const -> size_t; + + /** + * @return The filtered log events map. + */ + [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType; + + /** + * Generates a filtered collection from all log events. + * + * @param log_level_filter Array of selected log levels + */ + void filter_log_events(emscripten::val const& log_level_filter); + + /** + * Deserializes all log events in the stream. After the stream has been exhausted, it will be + * deallocated. + * + * @return The number of successfully deserialized ("valid") log events. + */ + [[nodiscard]] auto deserialize_stream() -> size_t; + + /** + * Decodes log events in the range `[beginIdx, endIdx)` of the filtered or unfiltered + * (depending on the value of `useFilter`) log events collection. + * + * @param begin_idx + * @param end_idx + * @param use_filter Whether to decode from the filtered or unfiltered log events collection. + * @return An array where each element is a decoded log event represented by an array of: + * - The log event's message + * - The log event's timestamp as milliseconds since the Unix epoch + * - The log event's log level as an integer that indexes into `cLogLevelNames` + * - The log event's number (1-indexed) in the stream + * @return null if any log event in the range doesn't exist (e.g. the range exceeds the number + * of log events in the collection). + */ + [[nodiscard]] auto + decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType; + +private: + // Constructor + explicit IRStreamReader(StreamReaderDataContext>&& + stream_reader_data_context, ReaderOptions const& reader_options); + + // Variables + std::vector>> m_encoded_log_events; + std::unique_ptr>> + m_stream_reader_data_context; + FilteredLogEventsMap m_filtered_log_event_map; + clp::TimestampPattern m_ts_pattern; +}; +} // namespace clp_ffi_js::ir + +#endif // CLP_FFI_JS_IR_IR_STREAM_READER_HPP diff --git a/src/clp_ffi_js/ir/KVPairIRStreamReader.cpp b/src/clp_ffi_js/ir/KVPairIRStreamReader.cpp new file mode 100644 index 00000000..38608c20 --- /dev/null +++ b/src/clp_ffi_js/ir/KVPairIRStreamReader.cpp @@ -0,0 +1,217 @@ +#include "KVPairIRStreamReader.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace std::literals::string_literals; +using clp::ir::four_byte_encoded_variable_t; + +namespace clp_ffi_js::ir { +auto KVPairIRStreamReader::create( + DataArrayTsType const& data_array, + ReaderOptions const& reader_options +) -> KVPairIRStreamReader { + auto const length{data_array["length"].as()}; + SPDLOG_INFO("KVPairIRStreamReader::create: got buffer of length={}", length); + + // Copy array from JavaScript to C++ + clp::Array data_buffer{length}; + // NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast) + emscripten::val::module_property("HEAPU8") + .call("set", data_array, reinterpret_cast(data_buffer.data())); + // NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast) + + auto zstd_decompressor{std::make_unique()}; + zstd_decompressor->open(data_buffer.data(), length); + + auto deserialized_log_events{std::make_shared>()}; + + auto result{clp::ffi::ir_stream::Deserializer::create( + *zstd_decompressor, + IrUnitHandler( + *deserialized_log_events, + reader_options["logLevelKey"].as(), + reader_options["timestampKey"].as() + ) + )}; + if (result.has_error()) { + auto const error_code{result.error()}; + SPDLOG_CRITICAL( + "Failed to create deserializer: {}:{}", + error_code.category().name(), + error_code.message() + ); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + "Failed to create deserializer" + }; + } + + StreamReaderDataContext stream_reader_data_context{ + std::move(data_buffer), + std::move(zstd_decompressor), + std::move(result.value()) + }; + return KVPairIRStreamReader{ + std::move(stream_reader_data_context), + std::move(deserialized_log_events) + }; +} + +auto KVPairIRStreamReader::get_num_events_buffered() const -> size_t { + return m_deserialized_log_events->size(); +} + +auto KVPairIRStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { + if (false == m_filtered_log_event_map.has_value()) { + return FilteredLogEventMapTsType{emscripten::val::null()}; + } + + return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; +} + +auto KVPairIRStreamReader::filter_log_events(emscripten::val const& log_level_filter) -> void { + if (log_level_filter.isNull() || false == m_level_node_id.has_value()) { + m_filtered_log_event_map.reset(); + return; + } + + m_filtered_log_event_map.emplace(); + auto filter_levels{emscripten::vecFromJSArray>(log_level_filter + )}; + for (size_t log_event_idx = 0; log_event_idx < m_deserialized_log_events->size(); + ++log_event_idx) + { + auto const& log_event{m_deserialized_log_events->at(log_event_idx)}; + auto const& id_value_pairs{log_event.get_node_id_value_pairs()}; + auto const& log_level{id_value_pairs.at(m_level_node_id.value())}; + if (std::ranges::find( + filter_levels, + log_level.value().get_immutable_view() + ) + != filter_levels.end()) + { + m_filtered_log_event_map->emplace_back(log_event_idx); + } + } +} + +auto KVPairIRStreamReader::deserialize_stream() -> size_t { + if (nullptr != m_stream_reader_data_context) { + constexpr size_t cDefaultNumReservedLogEvents{500'000}; + m_deserialized_log_events->reserve(cDefaultNumReservedLogEvents); + auto& reader{m_stream_reader_data_context->get_reader()}; + clp::ffi::SchemaTree::Node::id_t log_level_node_id{clp::ffi::SchemaTree::cRootId}; + while (true) { + auto result{m_stream_reader_data_context->get_deserializer().deserialize_next_ir_unit( + reader + )}; + if (false == result.has_error()) { + continue; + } + auto const error{result.error()}; + if (std::errc::no_message_available == error + || std::errc::operation_not_permitted == error) + { + break; + } + if (std::errc::result_out_of_range == error) { + SPDLOG_ERROR("File contains an incomplete IR stream"); + break; + } + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Corrupt, + __FILENAME__, + __LINE__, + "Failed to deserialize: "s + error.category().name() + ":" + error.message() + }; + } + m_stream_reader_data_context.reset(nullptr); + } + + return m_deserialized_log_events->size(); +} + +auto KVPairIRStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType { + if (use_filter && false == m_filtered_log_event_map.has_value()) { + return DecodedResultsTsType{emscripten::val::null()}; + } + + size_t length + = use_filter ? m_filtered_log_event_map->size() : m_deserialized_log_events->size(); + if (length < end_idx || begin_idx > end_idx) { + return DecodedResultsTsType(emscripten::val::null()); + } + + auto const results{emscripten::val::array()}; + for (size_t i = begin_idx; i < end_idx; ++i) { + size_t log_event_idx = use_filter ? m_filtered_log_event_map->at(i) : i; + auto const& log_event{m_deserialized_log_events->at(log_event_idx)}; + + auto const json{log_event.serialize_to_json()}; + if (false == json.has_value()) { + SPDLOG_ERROR("Failed to decode message."); + break; + } + + auto const& id_value_pairs{log_event.get_node_id_value_pairs()}; + clp::ffi::value_int_t log_level{static_cast(LogLevel::NONE)}; + if (m_level_node_id.has_value()) { + auto const& log_level_pair{id_value_pairs.at(m_level_node_id.value())}; + log_level = log_level_pair.has_value() + ? log_level_pair.value().get_immutable_view() + : static_cast(LogLevel::NONE); + } + clp::ffi::value_int_t timestamp{0}; + if (m_timestamp_node_id.has_value()) { + auto const& timestamp_pair{id_value_pairs.at(m_timestamp_node_id.value())}; + timestamp = timestamp_pair.has_value() + ? timestamp_pair.value().get_immutable_view() + : 0; + } + + EM_ASM( + { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, + results.as_handle(), + json.value().dump().c_str(), + log_level, + timestamp, + log_event_idx + 1 + ); + } + + return DecodedResultsTsType(results); +} + +KVPairIRStreamReader::KVPairIRStreamReader( + StreamReaderDataContext&& stream_reader_data_context, + std::shared_ptr> deserialized_log_events +) + : m_stream_reader_data_context{std::make_unique>( + std::move(stream_reader_data_context) + )}, + m_deserialized_log_events{std::move(deserialized_log_events)} {} +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/KVPairIRStreamReader.hpp b/src/clp_ffi_js/ir/KVPairIRStreamReader.hpp new file mode 100644 index 00000000..5e9f1492 --- /dev/null +++ b/src/clp_ffi_js/ir/KVPairIRStreamReader.hpp @@ -0,0 +1,183 @@ +#ifndef CLP_FFI_JS_KV_PAIR_IR_STREAM_READER_HPP +#define CLP_FFI_JS_KV_PAIR_IR_STREAM_READER_HPP + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +namespace clp_ffi_js::ir { +using parsed_tree_node_id_t = std::optional; + +class IrUnitHandler { +public: + IrUnitHandler( + std::vector& deserialized_log_events, + std::string log_level_key, + std::string timestamp_key + ) + : m_deserialized_log_events{deserialized_log_events}, + m_log_level_key{std::move(log_level_key)}, + m_timestamp_key{std::move(timestamp_key)} {} + + // Implements `clp::ffi::ir_stream::IrUnitHandlerInterface` interface + [[nodiscard]] auto handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event + ) -> clp::ffi::ir_stream::IRErrorCode { + m_deserialized_log_events.emplace_back(std::move(log_event)); + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] static auto handle_utc_offset_change( + [[maybe_unused]] clp::UtcOffset utc_offset_old, + [[maybe_unused]] clp::UtcOffset utc_offset_new + ) -> clp::ffi::ir_stream::IRErrorCode { + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] auto handle_schema_tree_node_insertion( + [[maybe_unused]] clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator + ) -> clp::ffi::ir_stream::IRErrorCode { + ++m_current_node_id; + auto const& key_name{schema_tree_node_locator.get_key_name()}; + SPDLOG_DEBUG("m_current_node_id={}, key_name={}", m_current_node_id, key_name); + + if (m_log_level_key == key_name) { + m_level_node_id.emplace(m_current_node_id); + } else if (m_timestamp_key == key_name) { + m_timestamp_node_id.emplace(m_current_node_id); + } + + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] auto handle_end_of_stream() -> clp::ffi::ir_stream::IRErrorCode { + m_is_complete = true; + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + // Methods + [[nodiscard]] auto is_complete() const -> bool { return m_is_complete; } + + [[nodiscard]] auto get_deserialized_log_events( + ) const -> std::vector const& { + return m_deserialized_log_events; + } + + [[nodiscard]] auto get_level_node_id() const -> parsed_tree_node_id_t { + return m_level_node_id; + } + + [[nodiscard]] auto get_timestamp_node_id() const -> parsed_tree_node_id_t { + return m_timestamp_node_id; + } + +private: + std::string m_log_level_key; + std::string m_timestamp_key; + + // the root node has id=0 + clp::ffi::SchemaTree::Node::id_t m_current_node_id; + parsed_tree_node_id_t m_level_node_id; + parsed_tree_node_id_t m_timestamp_node_id; + + std::vector& m_deserialized_log_events; + bool m_is_complete{false}; +}; + +/** + * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded + * log events. + */ +class KVPairIRStreamReader : public StreamReader { +public: + /** + * Creates a StreamReader to read from the given array. + * + * @param data_array An array containing a Zstandard-compressed IR stream. + * @return The created instance. + * @throw ClpFfiJsException if any error occurs. + */ + [[nodiscard]] static auto create( + DataArrayTsType const& data_array, + ReaderOptions const& reader_options + ) -> KVPairIRStreamReader; + + // Destructor + ~KVPairIRStreamReader() override = default; + + // Disable copy constructor and assignment operator + KVPairIRStreamReader(KVPairIRStreamReader const&) = delete; + auto operator=(KVPairIRStreamReader const&) -> KVPairIRStreamReader& = delete; + + // Define default move constructor + KVPairIRStreamReader(KVPairIRStreamReader&&) = default; + // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. + auto operator=(KVPairIRStreamReader&&) -> KVPairIRStreamReader& = delete; + + /** + * @return The number of events buffered. + */ + [[nodiscard]] auto get_num_events_buffered() const -> size_t override; + + [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override; + + auto filter_log_events(emscripten::val const& log_level_filter) -> void override; + /** + * Deserializes and buffers log events in the range `[beginIdx, endIdx)`. After the stream has + * been exhausted, it will be deallocated. + * + * NOTE: Currently, this class only supports deserializing the full range of log events in the + * stream. + * + * @param begin_idx + * @param end_idx + * @return The number of successfully deserialized ("valid") log events. + */ + [[nodiscard]] auto deserialize_stream() -> size_t override; + + /** + * Decodes the deserialized log events in the range `[beginIdx, endIdx)`. + * + * @param begin_idx + * @param end_idx + * @return An array where each element is a decoded log event represented by an array of: + * - The log event's message + * - The log event's timestamp as milliseconds since the Unix epoch + * - The log event's log level as an integer that indexes into `cLogLevelNames` + * - The log event's number (1-indexed) in the stream + * @return null if any log event in the range doesn't exist (e.g., the range exceeds the number + * of log events in the file). + */ + [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType override; + +private: + using deserializer_t = clp::ffi::ir_stream::Deserializer; + + // Constructor + KVPairIRStreamReader( + StreamReaderDataContext&& stream_reader_data_context, + std::shared_ptr> deserialized_log_events + ); + + // Variables + std::shared_ptr> m_deserialized_log_events; + std::unique_ptr> m_stream_reader_data_context; + + parsed_tree_node_id_t m_level_node_id; + parsed_tree_node_id_t m_timestamp_node_id; + FilteredLogEventsMap m_filtered_log_event_map; +}; +} // namespace clp_ffi_js::ir + +#endif // CLP_FFI_JS_KV_PAIR_IR_STREAM_READER_HPP diff --git a/src/clp_ffi_js/ir/LogEventWithLevel.hpp b/src/clp_ffi_js/ir/LogEventWithLevel.hpp index 22a00af3..95e4b834 100644 --- a/src/clp_ffi_js/ir/LogEventWithLevel.hpp +++ b/src/clp_ffi_js/ir/LogEventWithLevel.hpp @@ -18,22 +18,19 @@ namespace clp_ffi_js::ir { * IR log event will contain a set of key-value pairs, one of which should be the log level. * @tparam encoded_variable_t The type of encoded variables in the event */ -template -class LogEventWithLevel : public clp::ir::LogEvent { +template +class LogEventWithLevel { public: // Constructors - LogEventWithLevel( - clp::ir::epoch_time_ms_t timestamp, - clp::UtcOffset utc_offset, - clp::ir::EncodedTextAst message, - LogLevel log_level - ) - : clp::ir::LogEvent{timestamp, utc_offset, std::move(message)}, + explicit LogEventWithLevel(log_event_t log_event, LogLevel log_level) + : m_log_event{std::move(log_event)}, m_log_level{log_level} {} + [[nodiscard]] auto get_log_event() const -> const log_event_t& { return m_log_event; } [[nodiscard]] auto get_log_level() const -> LogLevel { return m_log_level; } private: + log_event_t m_log_event; LogLevel m_log_level; }; } // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index b9c86b6b..a1feeb5e 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -1,42 +1,32 @@ #include "StreamReader.hpp" -#include #include #include -#include +#include +#include #include -#include #include #include -#include -#include +#include #include #include #include #include #include -#include -#include #include #include -#include -#include #include #include #include -#include -#include -#include - -using namespace std::literals::string_literals; -using clp::ir::four_byte_encoded_variable_t; +#include +#include namespace clp_ffi_js::ir { -auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader { +auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options) -> std::unique_ptr { auto const length{data_array["length"].as()}; - SPDLOG_INFO("StreamReader::create: got buffer of length={}", length); + SPDLOG_INFO("KVPairIRStreamReader::create: got buffer of length={}", length); // Copy array from JavaScript to C++ clp::Array data_buffer{length}; @@ -48,234 +38,49 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader { auto zstd_decompressor{std::make_unique()}; zstd_decompressor->open(data_buffer.data(), length); - bool is_four_bytes_encoding{true}; - if (auto const err{ - clp::ffi::ir_stream::get_encoding_type(*zstd_decompressor, is_four_bytes_encoding) - }; - clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err) - { - SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err); - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_MetadataCorrupted, - __FILENAME__, - __LINE__, - "Failed to decode encoding type." - }; - } - if (false == is_four_bytes_encoding) { + bool is_four_byte_encoding{}; + auto const get_encoding_type_result{ + clp::ffi::ir_stream::get_encoding_type(*zstd_decompressor, is_four_byte_encoding) + }; + if (clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != get_encoding_type_result) { + SPDLOG_CRITICAL("Failed to get encoding type: {}", get_encoding_type_result); throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Unsupported, + clp::ErrorCode::ErrorCode_Failure, __FILENAME__, __LINE__, - "IR stream uses unsupported encoding." + "Failed to get encoding type." }; } - - auto result{ - clp::ir::LogEventDeserializer::create(*zstd_decompressor) - }; - if (result.has_error()) { - auto const error_code{result.error()}; + clp::ffi::ir_stream::encoded_tag_t metadata_type{}; + std::vector metadata_bytes; + auto const deserialize_preamble_result{clp::ffi::ir_stream::deserialize_preamble( + *zstd_decompressor, + metadata_type, + metadata_bytes + )}; + if (clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != deserialize_preamble_result) { SPDLOG_CRITICAL( - "Failed to create deserializer: {}:{}", - error_code.category().name(), - error_code.message() + "Failed to deserialize preamble for version reading: {}", + deserialize_preamble_result ); throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_Failure, __FILENAME__, __LINE__, - "Failed to create deserializer" + "Failed to deserialize preamble for version reading." }; } - - StreamReaderDataContext stream_reader_data_context{ - std::move(data_buffer), - std::move(zstd_decompressor), - std::move(result.value()) + std::string_view const metadata_view{ + clp::size_checked_pointer_cast(metadata_bytes.data()), + metadata_bytes.size() }; - return StreamReader{std::move(stream_reader_data_context)}; -} - -auto StreamReader::get_num_events_buffered() const -> size_t { - return m_encoded_log_events.size(); -} - -auto StreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { - if (false == m_filtered_log_event_map.has_value()) { - return FilteredLogEventMapTsType{emscripten::val::null()}; - } - - return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; -} - -void StreamReader::filter_log_events(emscripten::val const& log_level_filter) { - if (log_level_filter.isNull()) { - m_filtered_log_event_map.reset(); - return; + nlohmann::json const metadata = nlohmann::json::parse(metadata_view); + auto const& version{metadata.at(clp::ffi::ir_stream::cProtocol::Metadata::VersionKey)}; + SPDLOG_INFO("The version is {}", version); + if (version == "v0.0.0") { + return std::make_unique(IRStreamReader::create(data_array, std::move(reader_options))); } - m_filtered_log_event_map.emplace(); - auto filter_levels{emscripten::vecFromJSArray>(log_level_filter - )}; - for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) { - auto const& log_event = m_encoded_log_events[log_event_idx]; - if (std::ranges::find( - filter_levels, - clp::enum_to_underlying_type(log_event.get_log_level()) - ) - != filter_levels.end()) - { - m_filtered_log_event_map->emplace_back(log_event_idx); - } - } + return std::make_unique(KVPairIRStreamReader::create(data_array, std::move(reader_options))); } - -auto StreamReader::deserialize_stream() -> size_t { - if (nullptr == m_stream_reader_data_context) { - return m_encoded_log_events.size(); - } - - constexpr size_t cDefaultNumReservedLogEvents{500'000}; - m_encoded_log_events.reserve(cDefaultNumReservedLogEvents); - - while (true) { - auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()}; - if (result.has_error()) { - auto const error{result.error()}; - if (std::errc::no_message_available == error) { - break; - } - if (std::errc::result_out_of_range == error) { - SPDLOG_ERROR("File contains an incomplete IR stream"); - break; - } - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Corrupt, - __FILENAME__, - __LINE__, - "Failed to deserialize: "s + error.category().name() + ":" + error.message() - }; - } - auto const& log_event = result.value(); - auto const& message = log_event.get_message(); - - auto const& logtype = message.get_logtype(); - constexpr size_t cLogLevelPositionInMessages{1}; - LogLevel log_level{LogLevel::NONE}; - if (logtype.length() > cLogLevelPositionInMessages) { - // NOLINTNEXTLINE(readability-qualified-auto) - auto const log_level_name_it{std::find_if( - cLogLevelNames.begin() + static_cast(cValidLogLevelsBeginIdx), - cLogLevelNames.end(), - [&](std::string_view level) { - return logtype.substr(cLogLevelPositionInMessages).starts_with(level); - } - )}; - if (log_level_name_it != cLogLevelNames.end()) { - log_level = static_cast( - std::distance(cLogLevelNames.begin(), log_level_name_it) - ); - } - } - - auto log_viewer_event{LogEventWithLevel( - log_event.get_timestamp(), - log_event.get_utc_offset(), - message, - log_level - )}; - m_encoded_log_events.emplace_back(std::move(log_viewer_event)); - } - m_stream_reader_data_context.reset(nullptr); - return m_encoded_log_events.size(); -} - -auto StreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const - -> DecodedResultsTsType { - if (use_filter && false == m_filtered_log_event_map.has_value()) { - return DecodedResultsTsType{emscripten::val::null()}; - } - - size_t length{0}; - if (use_filter) { - length = m_filtered_log_event_map->size(); - } else { - length = m_encoded_log_events.size(); - } - if (length < end_idx || begin_idx > end_idx) { - return DecodedResultsTsType{emscripten::val::null()}; - } - - std::string message; - constexpr size_t cDefaultReservedMessageLength{512}; - message.reserve(cDefaultReservedMessageLength); - auto const results{emscripten::val::array()}; - - for (size_t i = begin_idx; i < end_idx; ++i) { - size_t log_event_idx{0}; - if (use_filter) { - log_event_idx = m_filtered_log_event_map->at(i); - } else { - log_event_idx = i; - } - auto const& log_event{m_encoded_log_events[log_event_idx]}; - - auto const parsed{log_event.get_message().decode_and_unparse()}; - if (false == parsed.has_value()) { - SPDLOG_ERROR("Failed to decode message."); - break; - } - message = parsed.value(); - - m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message); - - EM_ASM( - { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, - results.as_handle(), - message.c_str(), - log_event.get_timestamp(), - log_event.get_log_level(), - log_event_idx + 1 - ); - } - - return DecodedResultsTsType(results); -} - -StreamReader::StreamReader( - StreamReaderDataContext&& stream_reader_data_context -) - : m_stream_reader_data_context{std::make_unique< - StreamReaderDataContext>( - std::move(stream_reader_data_context) - )}, - m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} } // namespace clp_ffi_js::ir - -namespace { -EMSCRIPTEN_BINDINGS(ClpIrStreamReader) { - emscripten::register_type("Uint8Array"); - emscripten::register_type( - "Array<[string, number, number, number]>" - ); - emscripten::register_type("number[] | null"); - - emscripten::class_("ClpIrStreamReader") - .constructor( - &clp_ffi_js::ir::StreamReader::create, - emscripten::return_value_policy::take_ownership() - ) - .function( - "getNumEventsBuffered", - &clp_ffi_js::ir::StreamReader::get_num_events_buffered - ) - .function( - "getFilteredLogEventMap", - &clp_ffi_js::ir::StreamReader::get_filtered_log_event_map - ) - .function("filterLogEvents", &clp_ffi_js::ir::StreamReader::filter_log_events) - .function("deserializeStream", &clp_ffi_js::ir::StreamReader::deserialize_stream) - .function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range); -} -} // namespace diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index dec6c360..248ea659 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -3,21 +3,15 @@ #include #include -#include -#include -#include -#include #include #include -#include -#include - namespace clp_ffi_js::ir { EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType); +EMSCRIPTEN_DECLARE_VAL_TYPE(ReaderOptions); /** * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded @@ -31,51 +25,35 @@ class StreamReader { */ using FilteredLogEventsMap = std::optional>; - /** - * Creates a StreamReader to read from the given array. - * - * @param data_array An array containing a Zstandard-compressed IR stream. - * @return The created instance. - * @throw ClpFfiJsException if any error occurs. - */ - [[nodiscard]] static auto create(DataArrayTsType const& data_array) -> StreamReader; - - // Destructor - ~StreamReader() = default; - - // Disable copy constructor and assignment operator - StreamReader(StreamReader const&) = delete; - auto operator=(StreamReader const&) -> StreamReader& = delete; + virtual ~StreamReader() = default; - // Define default move constructor - StreamReader(StreamReader&&) = default; - // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. - auto operator=(StreamReader&&) -> StreamReader& = delete; + [[nodiscard]] static auto create(DataArrayTsType const& data_array, ReaderOptions const& reader_options + ) -> std::unique_ptr; /** * @return The number of events buffered. */ - [[nodiscard]] auto get_num_events_buffered() const -> size_t; + [[nodiscard]] virtual auto get_num_events_buffered() const -> size_t = 0; /** * @return The filtered log events map. */ - [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType; + [[nodiscard]] virtual auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType = 0; /** * Generates a filtered collection from all log events. * * @param log_level_filter Array of selected log levels */ - void filter_log_events(emscripten::val const& log_level_filter); + virtual auto filter_log_events(emscripten::val const& log_level_filter) -> void = 0; /** - * Deserializes all log events in the stream. After the stream has been exhausted, it will be + * Deserializes all log events in the file. After the stream has been exhausted, it will be * deallocated. * * @return The number of successfully deserialized ("valid") log events. */ - [[nodiscard]] auto deserialize_stream() -> size_t; + [[nodiscard]] virtual auto deserialize_stream() -> size_t = 0; /** * Decodes log events in the range `[beginIdx, endIdx)` of the filtered or unfiltered @@ -83,7 +61,8 @@ class StreamReader { * * @param begin_idx * @param end_idx - * @param use_filter Whether to decode from the filtered or unfiltered log events collection. + * @param use_filter If true, decode from the filtered log events collection; otherwise, decode + * from the unfiltered one. * @return An array where each element is a decoded log event represented by an array of: * - The log event's message * - The log event's timestamp as milliseconds since the Unix epoch @@ -92,20 +71,8 @@ class StreamReader { * @return null if any log event in the range doesn't exist (e.g. the range exceeds the number * of log events in the collection). */ - [[nodiscard]] auto - decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType; - -private: - // Constructor - explicit StreamReader(StreamReaderDataContext&& - stream_reader_data_context); - - // Variables - std::vector> m_encoded_log_events; - std::unique_ptr> - m_stream_reader_data_context; - FilteredLogEventsMap m_filtered_log_event_map; - clp::TimestampPattern m_ts_pattern; + [[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType = 0; }; } // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp index 091b0b05..04a90a1a 100644 --- a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp +++ b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp @@ -5,25 +5,24 @@ #include #include -#include -#include #include namespace clp_ffi_js::ir { /** * The data context for a `StreamReader`. It encapsulates a chain of the following resources: - * A `clp::ir::LogEventDeserializer` that reads from a + * A `clp::ir::LogEventDeserializer` / `clp::ffi::ir_stream::Deserializer` that reads from a * `clp::streaming_compression::zstd::Decompressor`, which in turn reads from a `clp::Array`. - * @tparam encoded_variable_t Type of encoded variables encoded in the stream. + * + * @tparam deserializer_t Type of deserializer for decoding the stream. */ -template +template class StreamReaderDataContext { public: // Constructors StreamReaderDataContext( clp::Array&& data_buffer, std::unique_ptr&& zstd_decompressor, - clp::ir::LogEventDeserializer deserializer + deserializer_t deserializer ) : m_data_buffer{std::move(data_buffer)}, m_zstd_decompressor{std::move(zstd_decompressor)}, @@ -42,16 +41,21 @@ class StreamReaderDataContext { // Methods /** - * @return A reference to the deserializer. + * @return A reference to the reader. */ - [[nodiscard]] auto get_deserializer() -> clp::ir::LogEventDeserializer& { - return m_deserializer; + [[nodiscard]] auto get_reader() const -> clp::streaming_compression::zstd::Decompressor& { + return *m_zstd_decompressor; } + /** + * @return A reference to the deserializer. + */ + [[nodiscard]] auto get_deserializer() -> deserializer_t& { return m_deserializer; } + private: clp::Array m_data_buffer; std::unique_ptr m_zstd_decompressor; - clp::ir::LogEventDeserializer m_deserializer; + deserializer_t m_deserializer; }; } // namespace clp_ffi_js::ir diff --git a/src/submodules/clp b/src/submodules/clp index 86299ca2..51931f69 160000 --- a/src/submodules/clp +++ b/src/submodules/clp @@ -1 +1 @@ -Subproject commit 86299ca2907565e09cb10c2ddd3661ad1ceb6cb0 +Subproject commit 51931f69bff0d7bdd8d8488465453e4d3f6be90b diff --git a/tools/yscope-dev-utils b/tools/yscope-dev-utils index 159768c7..ad576e43 160000 --- a/tools/yscope-dev-utils +++ b/tools/yscope-dev-utils @@ -1 +1 @@ -Subproject commit 159768c7d171595ed2cba17b758c10043a2efe96 +Subproject commit ad576e43c1a43d7a6afde79fc9c3c952b7bf28bd