Skip to content

Commit

Permalink
feat: Add support for log-level filtering of structured IR streams. (#35
Browse files Browse the repository at this point in the history
)
  • Loading branch information
davemarco authored Dec 18, 2024
1 parent d2556b2 commit 5f9e50d
Show file tree
Hide file tree
Showing 11 changed files with 511 additions and 264 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ endif()
set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/StreamReader.cpp
src/clp_ffi_js/ir/StructuredIrStreamReader.cpp
src/clp_ffi_js/ir/StructuredIrUnitHandler.cpp
src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp
)

Expand Down
21 changes: 12 additions & 9 deletions src/clp_ffi_js/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <array>
#include <cstdint>
#include <string_view>
#include <type_utils.hpp>

namespace clp_ffi_js {
/**
Expand All @@ -17,6 +18,7 @@ enum class LogLevel : std::uint8_t {
WARN,
ERROR,
FATAL,
LENGTH, // This isn't a valid log level.
};
constexpr LogLevel cValidLogLevelsBeginIdx{LogLevel::TRACE};

Expand All @@ -25,15 +27,16 @@ constexpr LogLevel cValidLogLevelsBeginIdx{LogLevel::TRACE};
*
* NOTE: These must be kept in sync manually.
*/
constexpr std::array<std::string_view, 7> cLogLevelNames{
"NONE", // This isn't a valid log level.
"TRACE",
"DEBUG",
"INFO",
"WARN",
"ERROR",
"FATAL",
};
constexpr std::array<std::string_view, clp::enum_to_underlying_type(LogLevel::LENGTH)>
cLogLevelNames{
"NONE", // This isn't a valid log level.
"TRACE",
"DEBUG",
"INFO",
"WARN",
"ERROR",
"FATAL",
};
} // namespace clp_ffi_js

#endif // CLP_FFI_JS_CONSTANTS_HPP
4 changes: 3 additions & 1 deletion src/clp_ffi_js/ir/LogEventWithFilterData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <concepts>
#include <utility>

#include <clp/ffi/KeyValuePairLogEvent.hpp>
#include <clp/ir/LogEvent.hpp>
#include <clp/ir/types.hpp>

Expand All @@ -12,6 +13,7 @@
namespace clp_ffi_js::ir {
using clp::ir::four_byte_encoded_variable_t;
using UnstructuredLogEvent = clp::ir::LogEvent<four_byte_encoded_variable_t>;
using StructuredLogEvent = clp::ffi::KeyValuePairLogEvent;

/**
* A templated class that extends a log event type with processed versions of some of its fields,
Expand All @@ -21,7 +23,7 @@ using UnstructuredLogEvent = clp::ir::LogEvent<four_byte_encoded_variable_t>;
* @tparam LogEvent The type of the log event.
*/
template <typename LogEvent>
requires std::same_as<LogEvent, UnstructuredLogEvent>
requires std::same_as<LogEvent, UnstructuredLogEvent> || std::same_as<LogEvent, StructuredLogEvent>
class LogEventWithFilterData {
public:
// Constructor
Expand Down
4 changes: 3 additions & 1 deletion src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ EMSCRIPTEN_BINDINGS(ClpStreamReader) {
// JS types used as inputs
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::LogLevelFilterTsType>("number[] | null");
emscripten::register_type<clp_ffi_js::ir::ReaderOptions>("{timestampKey: string} | null");
emscripten::register_type<clp_ffi_js::ir::ReaderOptions>(
"{logLevelKey: string, timestampKey: string} | null"
);

// JS types used as outputs
emscripten::enum_<clp_ffi_js::ir::StreamType>("IrStreamType")
Expand Down
155 changes: 155 additions & 0 deletions src/clp_ffi_js/ir/StreamReader.hpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
#ifndef CLP_FFI_JS_IR_STREAMREADER_HPP
#define CLP_FFI_JS_IR_STREAMREADER_HPP

#include <algorithm>
#include <concepts>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <type_traits>
#include <vector>

#include <clp/streaming_compression/zstd/Decompressor.hpp>
#include <clp/type_utils.hpp>
#include <emscripten/em_asm.h>
#include <emscripten/val.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/LogEventWithFilterData.hpp>

namespace clp_ffi_js::ir {
// JS types used as inputs
Expand All @@ -23,6 +35,15 @@ enum class StreamType : uint8_t {
Unstructured,
};

template <typename LogEvent>
using LogEvents = std::vector<LogEventWithFilterData<LogEvent>>;

/**
* Mapping between an index in the filtered log events collection to an index in the unfiltered
* log events collection.
*/
using FilteredLogEventsMap = std::optional<std::vector<size_t>>;

/**
* Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded
* log events.
Expand All @@ -35,6 +56,7 @@ class StreamReader {
* Creates a `StreamReader` to read from the given array.
*
* @param data_array An array containing a Zstandard-compressed IR stream.
* @param reader_options
* @return The created instance.
* @throw ClpFfiJsException if any error occurs.
*/
Expand Down Expand Up @@ -79,6 +101,7 @@ class StreamReader {
* Deserializes all log events in the stream.
*
* @return The number of successfully deserialized ("valid") log events.
* @throw ClpFfiJsException if an error occurs during deserialization.
*/
[[nodiscard]] virtual auto deserialize_stream() -> size_t = 0;

Expand All @@ -96,13 +119,145 @@ class StreamReader {
* - The log event's number (1-indexed) in the stream
* @return null if any log event in the range doesn't exist (e.g. the range exceeds the number
* of log events in the collection).
* @throw ClpFfiJsException if a message cannot be decoded.
*/
[[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType = 0;

protected:
explicit StreamReader() = default;

/**
* Templated implementation of `decode_range` that uses `log_event_to_string` to convert
* `log_event` to a string for the returned result.
*
* @tparam LogEvent
* @tparam ToStringFunc Function to convert a log event into a string.
* @param begin_idx
* @param end_idx
* @param filtered_log_event_map
* @param log_events
* @param use_filter
* @param log_event_to_string
* @return See `decode_range`.
* @throws Propagates `ToStringFunc`'s exceptions.
*/
template <typename LogEvent, typename ToStringFunc>
requires requires(ToStringFunc func, LogEvent const& log_event) {
{
func(log_event)
} -> std::convertible_to<std::string>;
}
static auto generic_decode_range(
size_t begin_idx,
size_t end_idx,
FilteredLogEventsMap const& filtered_log_event_map,
LogEvents<LogEvent> const& log_events,
ToStringFunc log_event_to_string,
bool use_filter
) -> DecodedResultsTsType;

/**
* Templated implementation of `filter_log_events`.
*
* @tparam LogEvent
* @param log_level_filter
* @param log_events Derived class's log events.
* @param log_events
* @param[out] filtered_log_event_map Returns the filtered log events.
*/
template <typename LogEvent>
static auto generic_filter_log_events(
FilteredLogEventsMap& filtered_log_event_map,
LogLevelFilterTsType const& log_level_filter,
LogEvents<LogEvent> const& log_events
) -> void;
};

template <typename LogEvent, typename ToStringFunc>
requires requires(ToStringFunc func, LogEvent const& log_event) {
{
func(log_event)
} -> std::convertible_to<std::string>;
}
auto StreamReader::generic_decode_range(
size_t begin_idx,
size_t end_idx,
FilteredLogEventsMap const& filtered_log_event_map,
LogEvents<LogEvent> const& log_events,
ToStringFunc log_event_to_string,
bool use_filter
) -> DecodedResultsTsType {
if (use_filter && false == filtered_log_event_map.has_value()) {
return DecodedResultsTsType{emscripten::val::null()};
}

size_t length{0};
if (use_filter) {
length = filtered_log_event_map->size();
} else {
length = log_events.size();
}
if (length < end_idx || begin_idx > end_idx) {
SPDLOG_ERROR("Invalid log event index range: {}-{}", begin_idx, end_idx);
return DecodedResultsTsType{emscripten::val::null()};
}

auto const results{emscripten::val::array()};

for (size_t i = begin_idx; i < end_idx; ++i) {
size_t log_event_idx{0};
if (use_filter) {
log_event_idx = filtered_log_event_map->at(i);
} else {
log_event_idx = i;
}

auto const& log_event_with_filter_data{log_events.at(log_event_idx)};
auto const& log_event = log_event_with_filter_data.get_log_event();
auto const& timestamp = log_event_with_filter_data.get_timestamp();
auto const& log_level = log_event_with_filter_data.get_log_level();

EM_ASM(
{ Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); },
results.as_handle(),
log_event_to_string(log_event).c_str(),
timestamp,
log_level,
log_event_idx + 1
);
}

return DecodedResultsTsType(results);
}

template <typename LogEvent>
auto StreamReader::generic_filter_log_events(
FilteredLogEventsMap& filtered_log_event_map,
LogLevelFilterTsType const& log_level_filter,
LogEvents<LogEvent> const& log_events
) -> void {
if (log_level_filter.isNull()) {
filtered_log_event_map.reset();
return;
}

filtered_log_event_map.emplace();
auto filter_levels
= emscripten::vecFromJSArray<std::underlying_type_t<LogLevel>>(log_level_filter);

for (size_t log_event_idx = 0; log_event_idx < log_events.size(); ++log_event_idx) {
auto const& log_event = log_events[log_event_idx];
if (std::ranges::find(
filter_levels,
clp::enum_to_underlying_type(log_event.get_log_level())
)
!= filter_levels.end())
{
filtered_log_event_map->emplace_back(log_event_idx);
}
}
}
} // namespace clp_ffi_js::ir

#endif // CLP_FFI_JS_IR_STREAMREADER_HPP
Loading

0 comments on commit 5f9e50d

Please sign in to comment.