Skip to content

Commit

Permalink
Add log-level filtering methods and associated refactoring: (#12)
Browse files Browse the repository at this point in the history
- Move log-level parsing from the decode method to the deserialization method.
- Remove unused range parameters from the deserialize method.
  • Loading branch information
davemarco authored Oct 10, 2024
1 parent 8d181d1 commit 4245173
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 70 deletions.
25 changes: 21 additions & 4 deletions src/clp_ffi_js/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,31 @@
#define CLP_FFI_JS_CONSTANTS_HPP

#include <array>
#include <cstddef>
#include <cstdint>
#include <string_view>

namespace clp_ffi_js {
constexpr size_t cLogLevelNone{0};
constexpr size_t cValidLogLevelsBeginIdx{cLogLevelNone + 1};
/**
* Enum of known log levels.
*/
enum class LogLevel : std::uint8_t {
NONE = 0, // This isn't a valid log level.
TRACE,
DEBUG,
INFO,
WARN,
ERROR,
FATAL,
};
constexpr LogLevel cValidLogLevelsBeginIdx{LogLevel::TRACE};

/**
* Strings corresponding to `LogLevel`.
*
* NOTE: These must be kept in sync manually.
*/
constexpr std::array<std::string_view, 7> cLogLevelNames{
"NONE", // This should not be used.
"NONE", // This isn't a valid log level.
"TRACE",
"DEBUG",
"INFO",
Expand Down
41 changes: 41 additions & 0 deletions src/clp_ffi_js/ir/LogEventWithLevel.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#ifndef CLP_FFI_JS_IR_LOGEVENTWITHLEVEL_HPP
#define CLP_FFI_JS_IR_LOGEVENTWITHLEVEL_HPP

#include <utility>

#include <clp/ir/EncodedTextAst.hpp>
#include <clp/ir/LogEvent.hpp>
#include <clp/ir/types.hpp>
#include <clp/time_types.hpp>

#include <clp_ffi_js/constants.hpp>

namespace clp_ffi_js::ir {
/**
* A class derived from `clp::ir::LogEvent` with an additional member for the log level.
*
* NOTE: Once we move to the next IR format, this class should no longer be necessary since each
* IR log event will contain a set of key-value pairs, one of which should be the log level.
* @tparam encoded_variable_t The type of encoded variables in the event
*/
template <typename encoded_variable_t>
class LogEventWithLevel : public clp::ir::LogEvent<encoded_variable_t> {
public:
// Constructors
LogEventWithLevel(
clp::ir::epoch_time_ms_t timestamp,
clp::UtcOffset utc_offset,
clp::ir::EncodedTextAst<encoded_variable_t> message,
LogLevel log_level
)
: clp::ir::LogEvent<encoded_variable_t>{timestamp, utc_offset, std::move(message)},
m_log_level{log_level} {}

[[nodiscard]] auto get_log_level() const -> LogLevel { return m_log_level; }

private:
LogLevel m_log_level;
};
} // namespace clp_ffi_js::ir

#endif // CLP_FFI_JS_IR_LOGEVENTWITHLEVEL_HPP
161 changes: 108 additions & 53 deletions src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,29 @@
#include <cstdint>
#include <iterator>
#include <memory>
#include <span>
#include <optional>
#include <string>
#include <string_view>
#include <system_error>
#include <type_traits>
#include <utility>
#include <vector>

#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ir/LogEvent.hpp>
#include <clp/ir/LogEventDeserializer.hpp>
#include <clp/ir/types.hpp>
#include <clp/streaming_compression/zstd/Decompressor.hpp>
#include <clp/TraceableException.hpp>
#include <clp/type_utils.hpp>
#include <emscripten/em_asm.h>
#include <emscripten/val.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/LogEventWithLevel.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

using namespace std::literals::string_literals;
Expand Down Expand Up @@ -99,25 +101,47 @@ auto StreamReader::get_num_events_buffered() const -> size_t {
return m_encoded_log_events.size();
}

auto StreamReader::deserialize_range(size_t begin_idx, size_t end_idx) -> size_t {
constexpr size_t cFullRangeEndIdx{0};
if (0 != begin_idx || cFullRangeEndIdx != end_idx) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
"Partial range deserialization is not yet supported."
};
auto StreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType {
if (false == m_filtered_log_event_map.has_value()) {
return FilteredLogEventMapTsType{emscripten::val::null()};
}
if (nullptr != m_stream_reader_data_context) {
constexpr size_t cDefaultNumReservedLogEvents{500'000};
m_encoded_log_events.reserve(cDefaultNumReservedLogEvents);
while (true) {
auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()};
if (false == result.has_error()) {
m_encoded_log_events.emplace_back(std::move(result.value()));
continue;
}

return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())};
}

void StreamReader::filter_log_events(emscripten::val const& log_level_filter) {
if (log_level_filter.isNull()) {
m_filtered_log_event_map.reset();
return;
}

m_filtered_log_event_map.emplace();
auto filter_levels{emscripten::vecFromJSArray<std::underlying_type_t<LogLevel>>(log_level_filter
)};
for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) {
auto const& log_event = m_encoded_log_events[log_event_idx];
if (std::ranges::find(
filter_levels,
clp::enum_to_underlying_type(log_event.get_log_level())
)
!= filter_levels.end())
{
m_filtered_log_event_map->emplace_back(log_event_idx);
}
}
}

auto StreamReader::deserialize_stream() -> size_t {
if (nullptr == m_stream_reader_data_context) {
return m_encoded_log_events.size();
}

constexpr size_t cDefaultNumReservedLogEvents{500'000};
m_encoded_log_events.reserve(cDefaultNumReservedLogEvents);

while (true) {
auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()};
if (result.has_error()) {
auto const error{result.error()};
if (std::errc::no_message_available == error) {
break;
Expand All @@ -133,51 +157,76 @@ auto StreamReader::deserialize_range(size_t begin_idx, size_t end_idx) -> size_t
"Failed to deserialize: "s + error.category().name() + ":" + error.message()
};
}
m_stream_reader_data_context.reset(nullptr);
}
auto const& log_event = result.value();
auto const& message = log_event.get_message();

auto const& logtype = message.get_logtype();
constexpr size_t cLogLevelPositionInMessages{1};
LogLevel log_level{LogLevel::NONE};
if (logtype.length() > cLogLevelPositionInMessages) {
// NOLINTNEXTLINE(readability-qualified-auto)
auto const log_level_name_it{std::find_if(
cLogLevelNames.begin() + static_cast<size_t>(cValidLogLevelsBeginIdx),
cLogLevelNames.end(),
[&](std::string_view level) {
return logtype.substr(cLogLevelPositionInMessages).starts_with(level);
}
)};
if (log_level_name_it != cLogLevelNames.end()) {
log_level = static_cast<LogLevel>(
std::distance(cLogLevelNames.begin(), log_level_name_it)
);
}
}

auto log_viewer_event{LogEventWithLevel<four_byte_encoded_variable_t>(
log_event.get_timestamp(),
log_event.get_utc_offset(),
message,
log_level
)};
m_encoded_log_events.emplace_back(std::move(log_viewer_event));
}
m_stream_reader_data_context.reset(nullptr);
return m_encoded_log_events.size();
}

auto StreamReader::decode_range(size_t begin_idx, size_t end_idx) const -> DecodedResultsTsType {
if (m_encoded_log_events.size() < end_idx || begin_idx >= end_idx) {
return DecodedResultsTsType(emscripten::val::null());
auto StreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType {
if (use_filter && false == m_filtered_log_event_map.has_value()) {
return DecodedResultsTsType{emscripten::val::null()};
}

size_t length{0};
if (use_filter) {
length = m_filtered_log_event_map->size();
} else {
length = m_encoded_log_events.size();
}
if (length < end_idx || begin_idx > end_idx) {
return DecodedResultsTsType{emscripten::val::null()};
}

std::span const log_events_span{
m_encoded_log_events.begin()
+ static_cast<decltype(m_encoded_log_events)::difference_type>(begin_idx),
m_encoded_log_events.begin()
+ static_cast<decltype(m_encoded_log_events)::difference_type>(end_idx)
};
std::string message;
constexpr size_t cDefaultReservedMessageLength{512};
message.reserve(cDefaultReservedMessageLength);
size_t log_num{begin_idx + 1};
auto const results{emscripten::val::array()};
for (auto const& log_event : log_events_span) {
message.clear();

for (size_t i = begin_idx; i < end_idx; ++i) {
size_t log_event_idx{0};
if (use_filter) {
log_event_idx = m_filtered_log_event_map->at(i);
} else {
log_event_idx = i;
}
auto const& log_event{m_encoded_log_events[log_event_idx]};

auto const parsed{log_event.get_message().decode_and_unparse()};
if (false == parsed.has_value()) {
SPDLOG_ERROR("Failed to decode message.");
break;
}
message.append(parsed.value());

constexpr size_t cLogLevelPositionInMessages{1};
size_t log_level{cLogLevelNone};
// NOLINTNEXTLINE(readability-qualified-auto)
auto const log_level_name_it{std::find_if(
cLogLevelNames.begin() + cValidLogLevelsBeginIdx,
cLogLevelNames.end(),
[&](std::string_view level) {
return message.substr(cLogLevelPositionInMessages).starts_with(level);
}
)};
if (log_level_name_it != cLogLevelNames.end()) {
log_level = std::distance(cLogLevelNames.begin(), log_level_name_it);
}
message = parsed.value();

m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message);

Expand All @@ -186,10 +235,9 @@ auto StreamReader::decode_range(size_t begin_idx, size_t end_idx) const -> Decod
results.as_handle(),
message.c_str(),
log_event.get_timestamp(),
log_level,
log_num
log_event.get_log_level(),
log_event_idx + 1
);
++log_num;
}

return DecodedResultsTsType(results);
Expand All @@ -211,6 +259,8 @@ EMSCRIPTEN_BINDINGS(ClpIrStreamReader) {
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
"Array<[string, number, number, number]>"
);
emscripten::register_type<clp_ffi_js::ir::FilteredLogEventMapTsType>("number[] | null");

emscripten::class_<clp_ffi_js::ir::StreamReader>("ClpIrStreamReader")
.constructor(
&clp_ffi_js::ir::StreamReader::create,
Expand All @@ -220,7 +270,12 @@ EMSCRIPTEN_BINDINGS(ClpIrStreamReader) {
"getNumEventsBuffered",
&clp_ffi_js::ir::StreamReader::get_num_events_buffered
)
.function("deserializeRange", &clp_ffi_js::ir::StreamReader::deserialize_range)
.function(
"getFilteredLogEventMap",
&clp_ffi_js::ir::StreamReader::get_filtered_log_event_map
)
.function("filterLogEvents", &clp_ffi_js::ir::StreamReader::filter_log_events)
.function("deserializeStream", &clp_ffi_js::ir::StreamReader::deserialize_stream)
.function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range);
}
} // namespace
Loading

0 comments on commit 4245173

Please sign in to comment.