diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f4f8bc5..0afef14b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -85,6 +85,7 @@ endif() set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp src/clp_ffi_js/ir/StructuredIrStreamReader.cpp + src/clp_ffi_js/ir/StructuredIrUnitHandler.cpp src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp ) diff --git a/src/clp_ffi_js/constants.hpp b/src/clp_ffi_js/constants.hpp index 045227da..c49829c5 100644 --- a/src/clp_ffi_js/constants.hpp +++ b/src/clp_ffi_js/constants.hpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace clp_ffi_js { /** @@ -17,6 +18,7 @@ enum class LogLevel : std::uint8_t { WARN, ERROR, FATAL, + LENGTH, // This isn't a valid log level. }; constexpr LogLevel cValidLogLevelsBeginIdx{LogLevel::TRACE}; @@ -25,15 +27,16 @@ constexpr LogLevel cValidLogLevelsBeginIdx{LogLevel::TRACE}; * * NOTE: These must be kept in sync manually. */ -constexpr std::array cLogLevelNames{ - "NONE", // This isn't a valid log level. - "TRACE", - "DEBUG", - "INFO", - "WARN", - "ERROR", - "FATAL", -}; +constexpr std::array + cLogLevelNames{ + "NONE", // This isn't a valid log level. + "TRACE", + "DEBUG", + "INFO", + "WARN", + "ERROR", + "FATAL", + }; } // namespace clp_ffi_js #endif // CLP_FFI_JS_CONSTANTS_HPP diff --git a/src/clp_ffi_js/ir/LogEventWithFilterData.hpp b/src/clp_ffi_js/ir/LogEventWithFilterData.hpp index 1ff5f07b..96710a03 100644 --- a/src/clp_ffi_js/ir/LogEventWithFilterData.hpp +++ b/src/clp_ffi_js/ir/LogEventWithFilterData.hpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -12,6 +13,7 @@ namespace clp_ffi_js::ir { using clp::ir::four_byte_encoded_variable_t; using UnstructuredLogEvent = clp::ir::LogEvent; +using StructuredLogEvent = clp::ffi::KeyValuePairLogEvent; /** * A templated class that extends a log event type with processed versions of some of its fields, @@ -21,7 +23,7 @@ using UnstructuredLogEvent = clp::ir::LogEvent; * @tparam LogEvent The type of the log event. */ template -requires std::same_as +requires std::same_as || std::same_as class LogEventWithFilterData { public: // Constructor diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index d12c8761..be8c011b 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -117,7 +117,9 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) { // JS types used as inputs emscripten::register_type("Uint8Array"); emscripten::register_type("number[] | null"); - emscripten::register_type("{timestampKey: string} | null"); + emscripten::register_type( + "{logLevelKey: string, timestampKey: string} | null" + ); // JS types used as outputs emscripten::enum_("IrStreamType") diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index 06e7c094..6d483e89 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -1,12 +1,24 @@ #ifndef CLP_FFI_JS_IR_STREAMREADER_HPP #define CLP_FFI_JS_IR_STREAMREADER_HPP +#include +#include #include #include #include +#include +#include +#include +#include #include +#include +#include #include +#include + +#include +#include namespace clp_ffi_js::ir { // JS types used as inputs @@ -23,6 +35,15 @@ enum class StreamType : uint8_t { Unstructured, }; +template +using LogEvents = std::vector>; + +/** + * Mapping between an index in the filtered log events collection to an index in the unfiltered + * log events collection. + */ +using FilteredLogEventsMap = std::optional>; + /** * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded * log events. @@ -35,6 +56,7 @@ class StreamReader { * Creates a `StreamReader` to read from the given array. * * @param data_array An array containing a Zstandard-compressed IR stream. + * @param reader_options * @return The created instance. * @throw ClpFfiJsException if any error occurs. */ @@ -79,6 +101,7 @@ class StreamReader { * Deserializes all log events in the stream. * * @return The number of successfully deserialized ("valid") log events. + * @throw ClpFfiJsException if an error occurs during deserialization. */ [[nodiscard]] virtual auto deserialize_stream() -> size_t = 0; @@ -96,13 +119,145 @@ class StreamReader { * - The log event's number (1-indexed) in the stream * @return null if any log event in the range doesn't exist (e.g. the range exceeds the number * of log events in the collection). + * @throw ClpFfiJsException if a message cannot be decoded. */ [[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType = 0; protected: explicit StreamReader() = default; + + /** + * Templated implementation of `decode_range` that uses `log_event_to_string` to convert + * `log_event` to a string for the returned result. + * + * @tparam LogEvent + * @tparam ToStringFunc Function to convert a log event into a string. + * @param begin_idx + * @param end_idx + * @param filtered_log_event_map + * @param log_events + * @param use_filter + * @param log_event_to_string + * @return See `decode_range`. + * @throws Propagates `ToStringFunc`'s exceptions. + */ + template + requires requires(ToStringFunc func, LogEvent const& log_event) { + { + func(log_event) + } -> std::convertible_to; + } + static auto generic_decode_range( + size_t begin_idx, + size_t end_idx, + FilteredLogEventsMap const& filtered_log_event_map, + LogEvents const& log_events, + ToStringFunc log_event_to_string, + bool use_filter + ) -> DecodedResultsTsType; + + /** + * Templated implementation of `filter_log_events`. + * + * @tparam LogEvent + * @param log_level_filter + * @param log_events Derived class's log events. + * @param log_events + * @param[out] filtered_log_event_map Returns the filtered log events. + */ + template + static auto generic_filter_log_events( + FilteredLogEventsMap& filtered_log_event_map, + LogLevelFilterTsType const& log_level_filter, + LogEvents const& log_events + ) -> void; }; + +template +requires requires(ToStringFunc func, LogEvent const& log_event) { + { + func(log_event) + } -> std::convertible_to; +} +auto StreamReader::generic_decode_range( + size_t begin_idx, + size_t end_idx, + FilteredLogEventsMap const& filtered_log_event_map, + LogEvents const& log_events, + ToStringFunc log_event_to_string, + bool use_filter +) -> DecodedResultsTsType { + if (use_filter && false == filtered_log_event_map.has_value()) { + return DecodedResultsTsType{emscripten::val::null()}; + } + + size_t length{0}; + if (use_filter) { + length = filtered_log_event_map->size(); + } else { + length = log_events.size(); + } + if (length < end_idx || begin_idx > end_idx) { + SPDLOG_ERROR("Invalid log event index range: {}-{}", begin_idx, end_idx); + return DecodedResultsTsType{emscripten::val::null()}; + } + + auto const results{emscripten::val::array()}; + + for (size_t i = begin_idx; i < end_idx; ++i) { + size_t log_event_idx{0}; + if (use_filter) { + log_event_idx = filtered_log_event_map->at(i); + } else { + log_event_idx = i; + } + + auto const& log_event_with_filter_data{log_events.at(log_event_idx)}; + auto const& log_event = log_event_with_filter_data.get_log_event(); + auto const& timestamp = log_event_with_filter_data.get_timestamp(); + auto const& log_level = log_event_with_filter_data.get_log_level(); + + EM_ASM( + { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, + results.as_handle(), + log_event_to_string(log_event).c_str(), + timestamp, + log_level, + log_event_idx + 1 + ); + } + + return DecodedResultsTsType(results); +} + +template +auto StreamReader::generic_filter_log_events( + FilteredLogEventsMap& filtered_log_event_map, + LogLevelFilterTsType const& log_level_filter, + LogEvents const& log_events +) -> void { + if (log_level_filter.isNull()) { + filtered_log_event_map.reset(); + return; + } + + filtered_log_event_map.emplace(); + auto filter_levels + = emscripten::vecFromJSArray>(log_level_filter); + + for (size_t log_event_idx = 0; log_event_idx < log_events.size(); ++log_event_idx) { + auto const& log_event = log_events[log_event_idx]; + if (std::ranges::find( + filter_levels, + clp::enum_to_underlying_type(log_event.get_log_level()) + ) + != filter_levels.end()) + { + filtered_log_event_map->emplace_back(log_event_idx); + } + } +} } // namespace clp_ffi_js::ir #endif // CLP_FFI_JS_IR_STREAMREADER_HPP diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp index 799da91c..f12b66b4 100644 --- a/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.cpp @@ -7,45 +7,38 @@ #include #include #include -#include #include #include #include -#include -#include -#include #include -#include #include #include #include -#include +#include #include #include +#include namespace clp_ffi_js::ir { namespace { constexpr std::string_view cEmptyJsonStr{"{}"}; -constexpr std::string_view cLogLevelFilteringNotSupportedErrorMsg{ - "Log level filtering is not yet supported in this reader." -}; +constexpr std::string_view cReaderOptionsLogLevelKey{"logLevelKey"}; constexpr std::string_view cReaderOptionsTimestampKey{"timestampKey"}; } // namespace -using clp::ir::four_byte_encoded_variable_t; - auto StructuredIrStreamReader::create( std::unique_ptr&& zstd_decompressor, clp::Array data_array, ReaderOptions const& reader_options ) -> StructuredIrStreamReader { - auto deserialized_log_events{std::make_shared>()}; + auto deserialized_log_events{std::make_shared()}; auto result{StructuredIrDeserializer::create( *zstd_decompressor, - IrUnitHandler{ + StructuredIrUnitHandler{ deserialized_log_events, + reader_options[cReaderOptionsLogLevelKey.data()].as(), reader_options[cReaderOptionsTimestampKey.data()].as() } )}; @@ -75,15 +68,19 @@ auto StructuredIrStreamReader::get_num_events_buffered() const -> size_t { } auto StructuredIrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { - SPDLOG_ERROR(cLogLevelFilteringNotSupportedErrorMsg); - return FilteredLogEventMapTsType{emscripten::val::null()}; + if (false == m_filtered_log_event_map.has_value()) { + return FilteredLogEventMapTsType{emscripten::val::null()}; + } + + return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; } void StructuredIrStreamReader::filter_log_events(LogLevelFilterTsType const& log_level_filter) { - if (log_level_filter.isNull()) { - return; - } - SPDLOG_ERROR(cLogLevelFilteringNotSupportedErrorMsg); + generic_filter_log_events( + m_filtered_log_event_map, + log_level_filter, + *m_deserialized_log_events + ); } auto StructuredIrStreamReader::deserialize_stream() -> size_t { @@ -94,16 +91,14 @@ auto StructuredIrStreamReader::deserialize_stream() -> size_t { constexpr size_t cDefaultNumReservedLogEvents{500'000}; m_deserialized_log_events->reserve(cDefaultNumReservedLogEvents); auto& reader{m_stream_reader_data_context->get_reader()}; - while (true) { - auto result{m_stream_reader_data_context->get_deserializer().deserialize_next_ir_unit(reader - )}; + auto& deserializer = m_stream_reader_data_context->get_deserializer(); + + while (false == deserializer.is_stream_completed()) { + auto result{deserializer.deserialize_next_ir_unit(reader)}; if (false == result.has_error()) { continue; } auto const error{result.error()}; - if (std::errc::operation_not_permitted == error) { - break; - } if (std::errc::result_out_of_range == error) { SPDLOG_ERROR("File contains an incomplete IR stream"); break; @@ -119,31 +114,15 @@ auto StructuredIrStreamReader::deserialize_stream() -> size_t { ) }; } - m_timestamp_node_id = m_stream_reader_data_context->get_deserializer() - .get_ir_unit_handler() - .get_timestamp_node_id(); m_stream_reader_data_context.reset(nullptr); return m_deserialized_log_events->size(); } auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType { - if (use_filter) { - SPDLOG_ERROR(cLogLevelFilteringNotSupportedErrorMsg); - return DecodedResultsTsType{emscripten::val::null()}; - } - - if (m_deserialized_log_events->size() < end_idx || begin_idx > end_idx) { - return DecodedResultsTsType{emscripten::val::null()}; - } - - auto const results{emscripten::val::array()}; - - for (size_t log_event_idx = begin_idx; log_event_idx < end_idx; ++log_event_idx) { - auto const& log_event{m_deserialized_log_events->at(log_event_idx)}; - + auto log_event_to_string = [](StructuredLogEvent const& log_event) -> std::string { + std::string json_str; auto const json_result{log_event.serialize_to_json()}; - std::string json_str{cEmptyJsonStr}; if (false == json_result.has_value()) { auto error_code{json_result.error()}; SPDLOG_ERROR( @@ -151,40 +130,26 @@ auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bo error_code.category().name(), error_code.message() ); + json_str = std::string(cEmptyJsonStr); } else { json_str = json_result.value().dump(); } + return json_str; + }; - auto const& id_value_pairs{log_event.get_node_id_value_pairs()}; - clp::ffi::value_int_t timestamp{0}; - if (m_timestamp_node_id.has_value()) { - auto const& timestamp_pair{id_value_pairs.at(m_timestamp_node_id.value())}; - if (timestamp_pair.has_value()) { - if (timestamp_pair->is()) { - timestamp = timestamp_pair.value().get_immutable_view(); - } else { - // TODO: Add support for parsing timestamp values of string type. - SPDLOG_ERROR("Unable to parse timestamp for log_event_idx={}", log_event_idx); - } - } - } - - EM_ASM( - { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, - results.as_handle(), - json_str.c_str(), - timestamp, - LogLevel::NONE, - log_event_idx + 1 - ); - } - - return DecodedResultsTsType(results); + return generic_decode_range( + begin_idx, + end_idx, + m_filtered_log_event_map, + *m_deserialized_log_events, + log_event_to_string, + use_filter + ); } StructuredIrStreamReader::StructuredIrStreamReader( StreamReaderDataContext&& stream_reader_data_context, - std::shared_ptr> deserialized_log_events + std::shared_ptr deserialized_log_events ) : m_deserialized_log_events{std::move(deserialized_log_events)}, m_stream_reader_data_context{ diff --git a/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp index e93dee03..6ccd5255 100644 --- a/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp +++ b/src/clp_ffi_js/ir/StructuredIrStreamReader.hpp @@ -4,118 +4,21 @@ #include #include #include -#include -#include -#include #include -#include #include -#include #include -#include #include -#include +#include #include #include +#include namespace clp_ffi_js::ir { using schema_tree_node_id_t = std::optional; - -/** - * Class that implements the `clp::ffi::ir_stream::IrUnitHandlerInterface` to buffer log events and - * determine the schema-tree node ID of the timestamp kv-pair. - */ -class IrUnitHandler { -public: - /** - * @param deserialized_log_events The vector in which to store deserialized log events. - * @param timestamp_key Key name of schema-tree node that contains the authoritative timestamp - * for events. - */ - IrUnitHandler( - std::shared_ptr> deserialized_log_events, - std::string timestamp_key - ) - : m_timestamp_key{std::move(timestamp_key)}, - m_deserialized_log_events{std::move(deserialized_log_events)} {} - - // Methods implementing `clp::ffi::ir_stream::IrUnitHandlerInterface`. - /** - * Buffers the log event. - * @param log_event - * @return IRErrorCode::IRErrorCode_Success - */ - [[nodiscard]] auto handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event - ) -> clp::ffi::ir_stream::IRErrorCode { - m_deserialized_log_events->emplace_back(std::move(log_event)); - - return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; - } - - /** - * @param utc_offset_old - * @param utc_offset_new - * @return IRErrorCode::IRErrorCode_Success - */ - [[nodiscard]] static auto handle_utc_offset_change( - [[maybe_unused]] clp::UtcOffset utc_offset_old, - [[maybe_unused]] clp::UtcOffset utc_offset_new - ) -> clp::ffi::ir_stream::IRErrorCode { - SPDLOG_WARN("UTC offset change packets aren't handled currently."); - - return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; - } - - /** - * Saves the node's ID if it corresponds to events' authoritative timestamp kv-pair. - * @param schema_tree_node_locator - * @return IRErrorCode::IRErrorCode_Success - */ - [[nodiscard]] auto handle_schema_tree_node_insertion( - clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator - ) -> clp::ffi::ir_stream::IRErrorCode { - ++m_current_node_id; - - auto const& key_name{schema_tree_node_locator.get_key_name()}; - if (m_timestamp_key == key_name) { - m_timestamp_node_id.emplace(m_current_node_id); - } - - return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; - } - - /** - * @return IRErrorCode::IRErrorCode_Success - */ - [[nodiscard]] static auto handle_end_of_stream() -> clp::ffi::ir_stream::IRErrorCode { - return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; - } - - // Methods - /** - * @return The schema-tree node ID associated with events' authoritative timestamp key. - */ - [[nodiscard]] auto get_timestamp_node_id() const -> schema_tree_node_id_t { - return m_timestamp_node_id; - } - -private: - // Variables - std::string m_timestamp_key; - - clp::ffi::SchemaTree::Node::id_t m_current_node_id{clp::ffi::SchemaTree::cRootId}; - - schema_tree_node_id_t m_timestamp_node_id; - - // TODO: Technically, we don't need to use a `shared_ptr` since the parent stream reader will - // have a longer lifetime than this class. Instead, we could use `gsl::not_null` once we add - // `gsl` into the project. - std::shared_ptr> m_deserialized_log_events; -}; - -using StructuredIrDeserializer = clp::ffi::ir_stream::Deserializer; +using StructuredIrDeserializer = clp::ffi::ir_stream::Deserializer; +using StructuredLogEvents = LogEvents; /** * Class to deserialize and decode Zstd-compressed CLP structured IR streams, as well as format @@ -175,14 +78,13 @@ class StructuredIrStreamReader : public StreamReader { // Constructor explicit StructuredIrStreamReader( StreamReaderDataContext&& stream_reader_data_context, - std::shared_ptr> deserialized_log_events + std::shared_ptr deserialized_log_events ); // Variables - std::shared_ptr> m_deserialized_log_events; + std::shared_ptr m_deserialized_log_events; std::unique_ptr> m_stream_reader_data_context; - - schema_tree_node_id_t m_timestamp_node_id; + FilteredLogEventsMap m_filtered_log_event_map; }; } // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StructuredIrUnitHandler.cpp b/src/clp_ffi_js/ir/StructuredIrUnitHandler.cpp new file mode 100644 index 00000000..0bff407c --- /dev/null +++ b/src/clp_ffi_js/ir/StructuredIrUnitHandler.cpp @@ -0,0 +1,160 @@ +#include "StructuredIrUnitHandler.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace clp_ffi_js::ir { +namespace { +/** + * Parses a string to determine the corresponding `LogLevel` enum value. + * @param str + * @return `LogLevel` enum corresponding to `str` if `str` matches a string in `cLogLevelNames`. + * @return `LogLevel::NONE` otherwise. + */ +auto parse_log_level(std::string_view str) -> LogLevel; + +auto parse_log_level(std::string_view str) -> LogLevel { + // Convert the string to uppercase. + std::string log_level_name_upper_case{str}; + std::ranges::transform( + log_level_name_upper_case.begin(), + log_level_name_upper_case.end(), + log_level_name_upper_case.begin(), + [](unsigned char c) { return std::toupper(c); } + ); + + auto const* it = std::ranges::find( + cLogLevelNames.begin() + clp::enum_to_underlying_type(cValidLogLevelsBeginIdx), + cLogLevelNames.end(), + log_level_name_upper_case + ); + if (it == cLogLevelNames.end()) { + return LogLevel::NONE; + } + + return static_cast(std::distance(cLogLevelNames.begin(), it)); +} +} // namespace + +auto StructuredIrUnitHandler::handle_log_event(StructuredLogEvent&& log_event +) -> clp::ffi::ir_stream::IRErrorCode { + auto const& id_value_pairs{log_event.get_node_id_value_pairs()}; + auto const timestamp = get_timestamp(id_value_pairs); + auto const log_level = get_log_level(id_value_pairs); + + m_deserialized_log_events->emplace_back(std::move(log_event), log_level, timestamp); + + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; +} + +auto StructuredIrUnitHandler::handle_utc_offset_change( + [[maybe_unused]] clp::UtcOffset utc_offset_old, + [[maybe_unused]] clp::UtcOffset utc_offset_new +) -> clp::ffi::ir_stream::IRErrorCode { + SPDLOG_WARN("UTC offset change packets aren't handled currently."); + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; +} + +auto StructuredIrUnitHandler::handle_schema_tree_node_insertion( + clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator +) -> clp::ffi::ir_stream::IRErrorCode { + ++m_current_node_id; + + auto const& key_name{schema_tree_node_locator.get_key_name()}; + if (key_name == m_log_level_key) { + m_log_level_node_id.emplace(m_current_node_id); + } else if (key_name == m_timestamp_key) { + m_timestamp_node_id.emplace(m_current_node_id); + } + + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; +} + +auto StructuredIrUnitHandler::handle_end_of_stream() -> clp::ffi::ir_stream::IRErrorCode { + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; +} + +auto StructuredIrUnitHandler::get_log_level( + StructuredLogEvent::NodeIdValuePairs const& id_value_pairs +) const -> LogLevel { + LogLevel log_level{LogLevel::NONE}; + + if (false == m_log_level_node_id.has_value()) { + return log_level; + } + auto const& optional_log_level_value{id_value_pairs.at(m_log_level_node_id.value())}; + if (false == optional_log_level_value.has_value()) { + return log_level; + } + auto const log_level_value = optional_log_level_value.value(); + + if (log_level_value.is()) { + auto const& log_level_str = log_level_value.get_immutable_view(); + log_level = parse_log_level(log_level_str); + } else if (log_level_value.is()) { + auto const& log_level_int = log_level_value.get_immutable_view(); + if (log_level_int >= clp::enum_to_underlying_type(cValidLogLevelsBeginIdx) + && log_level_int < clp::enum_to_underlying_type(LogLevel::LENGTH)) + { + log_level = static_cast(log_level_int); + } + } else { + auto log_event_idx = m_deserialized_log_events->size(); + SPDLOG_ERROR( + "Authoritative log level's value is not an int or string for log event index {}", + log_event_idx + ); + } + + return log_level; +} + +auto StructuredIrUnitHandler::get_timestamp( + StructuredLogEvent::NodeIdValuePairs const& id_value_pairs +) const -> clp::ir::epoch_time_ms_t { + clp::ir::epoch_time_ms_t timestamp{0}; + + if (false == m_timestamp_node_id.has_value()) { + return timestamp; + } + auto const& optional_timestamp_value{id_value_pairs.at(m_timestamp_node_id.value())}; + if (false == optional_timestamp_value.has_value()) { + return timestamp; + } + auto const timestamp_value = optional_timestamp_value.value(); + + if (timestamp_value.is()) { + timestamp = static_cast( + timestamp_value.get_immutable_view() + ); + } else { + // TODO: Add support for parsing string-type timestamp values. + auto log_event_idx = m_deserialized_log_events->size(); + SPDLOG_ERROR( + "Authoritative timestamp's value is not an int for log event index {}", + log_event_idx + ); + } + + return timestamp; +} +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StructuredIrUnitHandler.hpp b/src/clp_ffi_js/ir/StructuredIrUnitHandler.hpp new file mode 100644 index 00000000..bd8b6400 --- /dev/null +++ b/src/clp_ffi_js/ir/StructuredIrUnitHandler.hpp @@ -0,0 +1,118 @@ +#ifndef CLP_FFI_JS_IR_STRUCTUREDIRUNITHANDLER_HPP +#define CLP_FFI_JS_IR_STRUCTUREDIRUNITHANDLER_HPP + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +namespace clp_ffi_js::ir { +using schema_tree_node_id_t = std::optional; + +/** + * Class that implements the `clp::ffi::ir_stream::IrUnitHandlerInterface` to buffer log events and + * determine the schema-tree node IDs of the log level and timestamp kv-pairs. + */ +class StructuredIrUnitHandler { +public: + // Constructors + /** + * @param deserialized_log_events The vector in which to store deserialized log events. + * @param log_level_key Key name of schema-tree node that contains the authoritative log level. + * @param timestamp_key Key name of schema-tree node that contains the authoritative timestamp. + */ + StructuredIrUnitHandler( + std::shared_ptr>> + deserialized_log_events, + std::string log_level_key, + std::string timestamp_key + ) + : m_log_level_key{std::move(log_level_key)}, + m_timestamp_key{std::move(timestamp_key)}, + m_deserialized_log_events{std::move(deserialized_log_events)} {} + + // Methods implementing `clp::ffi::ir_stream::IrUnitHandlerInterface`. + /** + * Buffers the log event with filter data extracted. + * @param log_event + * @return IRErrorCode::IRErrorCode_Success + */ + [[nodiscard]] auto handle_log_event(StructuredLogEvent&& log_event + ) -> clp::ffi::ir_stream::IRErrorCode; + + /** + * Dummy implementation that does nothing but conforms to the interface. + * @param utc_offset_old + * @param utc_offset_new + * @return IRErrorCode::IRErrorCode_Success + */ + [[nodiscard]] static auto handle_utc_offset_change( + [[maybe_unused]] clp::UtcOffset utc_offset_old, + [[maybe_unused]] clp::UtcOffset utc_offset_new + ) -> clp::ffi::ir_stream::IRErrorCode; + + /** + * Saves the node's ID if it corresponds to events' authoritative log level or timestamp + * kv-pair. + * @param schema_tree_node_locator + * @return IRErrorCode::IRErrorCode_Success + */ + [[nodiscard]] auto handle_schema_tree_node_insertion( + clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator + ) -> clp::ffi::ir_stream::IRErrorCode; + + /** + * Dummy implementation that does nothing but conforms to the interface. + * @return IRErrorCode::IRErrorCode_Success + */ + [[nodiscard]] static auto handle_end_of_stream() -> clp::ffi::ir_stream::IRErrorCode; + +private: + // Methods + /** + * @param id_value_pairs + * @return `LogLevel::NONE` if `m_log_level_node_id` is unset, the node has no value, or the + * node's value is not an integer or string. + * @return `LogLevel` from node with id `m_log_level_node_id` otherwise. + */ + [[nodiscard]] auto get_log_level(StructuredLogEvent::NodeIdValuePairs const& id_value_pairs + ) const -> LogLevel; + + /** + * @param id_value_pairs + * @return 0 if `m_timestamp_node_id` is unset, the node has no value, or the node's value is + * not an integer. + * @return Timestamp from node with ID `m_timestamp_node_id` otherwise. + */ + [[nodiscard]] auto get_timestamp(StructuredLogEvent::NodeIdValuePairs const& id_value_pairs + ) const -> clp::ir::epoch_time_ms_t; + + // Variables + std::string m_log_level_key; + std::string m_timestamp_key; + + clp::ffi::SchemaTree::Node::id_t m_current_node_id{clp::ffi::SchemaTree::cRootId}; + + schema_tree_node_id_t m_log_level_node_id; + schema_tree_node_id_t m_timestamp_node_id; + + // TODO: Technically, we don't need to use a `shared_ptr` since the parent stream reader will + // have a longer lifetime than this class. Instead, we could use `gsl::not_null` once we add + // `gsl` into the project. + std::shared_ptr>> + m_deserialized_log_events; +}; +} // namespace clp_ffi_js::ir + +#endif // CLP_FFI_JS_IR_STRUCTUREDIRUNITHANDLER_HPP diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp index 37363fd0..22085e02 100644 --- a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp @@ -5,22 +5,17 @@ #include #include #include -#include #include #include #include -#include #include -#include #include #include #include #include #include -#include #include -#include #include #include @@ -74,25 +69,7 @@ auto UnstructuredIrStreamReader::get_filtered_log_event_map() const -> FilteredL } void UnstructuredIrStreamReader::filter_log_events(LogLevelFilterTsType const& log_level_filter) { - if (log_level_filter.isNull()) { - m_filtered_log_event_map.reset(); - return; - } - - m_filtered_log_event_map.emplace(); - auto filter_levels{emscripten::vecFromJSArray>(log_level_filter - )}; - for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) { - auto const& log_event = m_encoded_log_events[log_event_idx]; - if (std::ranges::find( - filter_levels, - clp::enum_to_underlying_type(log_event.get_log_level()) - ) - != filter_levels.end()) - { - m_filtered_log_event_map->emplace_back(log_event_idx); - } - } + generic_filter_log_events(m_filtered_log_event_map, log_level_filter, m_encoded_log_events); } auto UnstructuredIrStreamReader::deserialize_stream() -> size_t { @@ -155,38 +132,9 @@ auto UnstructuredIrStreamReader::deserialize_stream() -> size_t { auto UnstructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType { - if (use_filter && false == m_filtered_log_event_map.has_value()) { - return DecodedResultsTsType{emscripten::val::null()}; - } - - size_t length{0}; - if (use_filter) { - length = m_filtered_log_event_map->size(); - } else { - length = m_encoded_log_events.size(); - } - if (length < end_idx || begin_idx > end_idx) { - return DecodedResultsTsType{emscripten::val::null()}; - } - - std::string message; - constexpr size_t cDefaultReservedMessageLength{512}; - message.reserve(cDefaultReservedMessageLength); - auto const results{emscripten::val::array()}; - - for (size_t i = begin_idx; i < end_idx; ++i) { - size_t log_event_idx{0}; - if (use_filter) { - log_event_idx = m_filtered_log_event_map->at(i); - } else { - log_event_idx = i; - } - auto const& log_event_with_filter_data{m_encoded_log_events[log_event_idx]}; - auto const& unstructured_log_event = log_event_with_filter_data.get_log_event(); - auto const& log_level = log_event_with_filter_data.get_log_level(); - auto const& timestamp = log_event_with_filter_data.get_timestamp(); - - auto const parsed{unstructured_log_event.get_message().decode_and_unparse()}; + auto log_event_to_string = [this](UnstructuredLogEvent const& log_event) -> std::string { + std::string message; + auto const parsed{log_event.get_message().decode_and_unparse()}; if (false == parsed.has_value()) { throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_Failure, @@ -196,20 +144,18 @@ auto UnstructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, }; } message = parsed.value(); - - m_ts_pattern.insert_formatted_timestamp(timestamp, message); - - EM_ASM( - { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, - results.as_handle(), - message.c_str(), - timestamp, - log_level, - log_event_idx + 1 - ); - } - - return DecodedResultsTsType(results); + m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message); + return message; + }; + + return generic_decode_range( + begin_idx, + end_idx, + m_filtered_log_event_map, + m_encoded_log_events, + log_event_to_string, + use_filter + ); } UnstructuredIrStreamReader::UnstructuredIrStreamReader( diff --git a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp index 20137c39..60e515f3 100644 --- a/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp +++ b/src/clp_ffi_js/ir/UnstructuredIrStreamReader.hpp @@ -4,8 +4,6 @@ #include #include #include -#include -#include #include #include @@ -19,12 +17,7 @@ namespace clp_ffi_js::ir { using clp::ir::four_byte_encoded_variable_t; using UnstructuredIrDeserializer = clp::ir::LogEventDeserializer; - -/** - * Mapping between an index in the filtered log events collection to an index in the unfiltered - * log events collection. - */ -using FilteredLogEventsMap = std::optional>; +using UnstructuredLogEvents = LogEvents; /** * Class to deserialize and decode Zstd-compressed CLP unstructured IR streams, as well as format @@ -85,7 +78,7 @@ class UnstructuredIrStreamReader : public StreamReader { ); // Variables - std::vector> m_encoded_log_events; + UnstructuredLogEvents m_encoded_log_events; std::unique_ptr> m_stream_reader_data_context; FilteredLogEventsMap m_filtered_log_event_map;