Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype: Add initial code for IRv2 decoder. #10

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
acd4563
Add initial code for IRv2 decoder.
junhaoliao Oct 1, 2024
d883966
Revert changes to enable exception catching linker flags - Update CMa…
junhaoliao Oct 8, 2024
3301fb6
Merge branch 'main' into irv2
junhaoliao Oct 8, 2024
322b60a
Switch clp to OSS' main.
junhaoliao Oct 8, 2024
ab37228
Add code to parse and validate the CLP IR version.
junhaoliao Oct 8, 2024
f2ed47e
Templatize StreamReaderDataContext.
junhaoliao Oct 8, 2024
9062027
Rename IRv2 specialized StreamReader -> KVPairIRStreamReader.
junhaoliao Oct 8, 2024
0f25962
Create a new StreamReader base class and refactor KVPairIRStreamReade…
junhaoliao Oct 10, 2024
80f059b
Revert KVPairIRStreamReader(StreamReaderDataContext<deserializer_t>&&…
junhaoliao Oct 10, 2024
9be08c4
Reformat code.
junhaoliao Oct 10, 2024
7842d29
Optimize imports.
junhaoliao Oct 10, 2024
541b84e
Merge branch 'main' into irv2
junhaoliao Oct 10, 2024
5b49ec0
Rename the original StreamReader -> IrStreamReader and adapt it to th…
junhaoliao Oct 10, 2024
ff7321b
Add log level filtering to KVPairIRStreamReader.
junhaoliao Oct 11, 2024
98b0b70
Rename IrStreamReader -> IRStreamReader.
junhaoliao Oct 11, 2024
f080ece
Update to latest CLP commit.
junhaoliao Nov 1, 2024
7c529d8
Upgrade clp submodule commit to 9f6a6ced4da504f6ba3c131efa26fd5b30c6f533
junhaoliao Nov 6, 2024
6c0ecde
Add tree node id parsing for timestampKey and logLevelKey.
junhaoliao Nov 6, 2024
40908f0
Complete log level filtering.
junhaoliao Nov 6, 2024
e411420
Update yscope-dev-utils version.
junhaoliao Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ target_link_options(
-sEXPORT_ES6
-sMODULARIZE
-sWASM_BIGINT
-fwasm-exceptions
--emit-tsd ${CLP_FFI_JS_BIN_NAME}.d.ts
)
target_link_libraries(${CLP_FFI_JS_BIN_NAME} PRIVATE embind)
Expand All @@ -112,10 +113,19 @@ target_include_directories(

target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/)

set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp)
set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/bindings.cpp
src/clp_ffi_js/ir/KVPairIRStreamReader.cpp
src/clp_ffi_js/ir/IRStreamReader.cpp
src/clp_ffi_js/ir/StreamReader.cpp
)

set(CLP_FFI_JS_SRC_CLP_CORE
src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/utils.cpp
src/submodules/clp/components/core/src/clp/ffi/KeyValuePairLogEvent.cpp
src/submodules/clp/components/core/src/clp/ffi/SchemaTree.cpp
src/submodules/clp/components/core/src/clp/ir/EncodedTextAst.cpp
src/submodules/clp/components/core/src/clp/ir/LogEventDeserializer.cpp
src/submodules/clp/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp
Expand Down
60 changes: 60 additions & 0 deletions src/clp_ffi_js/bindings.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include <emscripten/bind.h>
#include "clp_ffi_js/ir/StreamReader.hpp"
#include "clp_ffi_js/ir/KVPairIRStreamReader.hpp"
#include "clp_ffi_js/ir/IRStreamReader.hpp"

namespace {
EMSCRIPTEN_BINDINGS(ClpIrStreamReader) {
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
"Array<[string, number, number, number]>"
);
emscripten::register_type<clp_ffi_js::ir::FilteredLogEventMapTsType>("number[] | null");
emscripten::register_type<clp_ffi_js::ir::ReaderOptions>("interface{logLevelKey?: string, timestampKey?: string}");

emscripten::class_<clp_ffi_js::ir::IRStreamReader,
emscripten::base<clp_ffi_js::ir::StreamReader>>("ClpIrStreamReader")
.constructor(
&clp_ffi_js::ir::IRStreamReader::create,
emscripten::return_value_policy::take_ownership()
)
.function(
"getNumEventsBuffered",
&clp_ffi_js::ir::IRStreamReader::get_num_events_buffered
)
.function(
"getFilteredLogEventMap",
&clp_ffi_js::ir::IRStreamReader::get_filtered_log_event_map
)
.function("filterLogEvents", &clp_ffi_js::ir::IRStreamReader::filter_log_events)
.function("deserializeStream", &clp_ffi_js::ir::IRStreamReader::deserialize_stream)
.function("decodeRange", &clp_ffi_js::ir::IRStreamReader::decode_range);

emscripten::class_<clp_ffi_js::ir::KVPairIRStreamReader,
emscripten::base<clp_ffi_js::ir::StreamReader>>("ClpKVPairIRStreamReader")
.constructor(
&clp_ffi_js::ir::KVPairIRStreamReader::create,
emscripten::return_value_policy::take_ownership()
)
.function(
"getNumEventsBuffered",
&clp_ffi_js::ir::KVPairIRStreamReader::get_num_events_buffered
)
.function(
"getFilteredLogEventMap",
&clp_ffi_js::ir::KVPairIRStreamReader::get_filtered_log_event_map
)
.function("filterLogEvents", &clp_ffi_js::ir::KVPairIRStreamReader::filter_log_events)
.function(
"deserializeStream",
&clp_ffi_js::ir::KVPairIRStreamReader::deserialize_stream
)
.function("decodeRange", &clp_ffi_js::ir::KVPairIRStreamReader::decode_range);

emscripten::class_<clp_ffi_js::ir::StreamReader>("ClpStreamReader")
.constructor(
&clp_ffi_js::ir::StreamReader::create,
emscripten::return_value_policy::take_ownership()
);
}
} // namespace
253 changes: 253 additions & 0 deletions src/clp_ffi_js/ir/IRStreamReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
#include "IRStreamReader.hpp"

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <system_error>
#include <type_traits>
#include <utility>
#include <vector>

#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ir/LogEventDeserializer.hpp>
#include <clp/ir/types.hpp>
#include <clp/streaming_compression/zstd/Decompressor.hpp>
#include <clp/TraceableException.hpp>
#include <clp/type_utils.hpp>
#include <emscripten/em_asm.h>
#include <emscripten/val.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/LogEventWithLevel.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

using namespace std::literals::string_literals;
using clp::ir::four_byte_encoded_variable_t;

namespace clp_ffi_js::ir {
auto IRStreamReader::create(DataArrayTsType const& data_array, ReaderOptions const& reader_options) -> IRStreamReader {
auto const length{data_array["length"].as<size_t>()};
SPDLOG_INFO("IRStreamReader::create: got buffer of length={}", length);

// Copy array from JavaScript to C++
clp::Array<char> data_buffer{length};
// NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast)
emscripten::val::module_property("HEAPU8")
.call<void>("set", data_array, reinterpret_cast<uintptr_t>(data_buffer.data()));
// NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast)

auto zstd_decompressor{std::make_unique<clp::streaming_compression::zstd::Decompressor>()};
zstd_decompressor->open(data_buffer.data(), length);

bool is_four_bytes_encoding{true};
if (auto const err{
clp::ffi::ir_stream::get_encoding_type(*zstd_decompressor, is_four_bytes_encoding)
};
clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err)
{
SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
"Failed to decode encoding type."
};
}
if (false == is_four_bytes_encoding) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
"IR stream uses unsupported encoding."
};
}

auto result{
clp::ir::LogEventDeserializer<four_byte_encoded_variable_t>::create(*zstd_decompressor)
};
if (result.has_error()) {
auto const error_code{result.error()};
SPDLOG_CRITICAL(
"Failed to create deserializer: {}:{}",
error_code.category().name(),
error_code.message()
);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
"Failed to create deserializer"
};
}

StreamReaderDataContext<clp::ir::LogEventDeserializer<clp::ir::four_byte_encoded_variable_t>> stream_reader_data_context{
std::move(data_buffer),
std::move(zstd_decompressor),
std::move(result.value())
};
return IRStreamReader{std::move(stream_reader_data_context), std::move(reader_options)};
}

auto IRStreamReader::get_num_events_buffered() const -> size_t {
return m_encoded_log_events.size();
}

auto IRStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType {
if (false == m_filtered_log_event_map.has_value()) {
return FilteredLogEventMapTsType{emscripten::val::null()};
}

return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())};
}

void IRStreamReader::filter_log_events(emscripten::val const& log_level_filter) {
if (log_level_filter.isNull()) {
m_filtered_log_event_map.reset();
return;
}

m_filtered_log_event_map.emplace();
auto filter_levels{emscripten::vecFromJSArray<std::underlying_type_t<LogLevel>>(log_level_filter
)};
for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) {
auto const& log_event = m_encoded_log_events[log_event_idx];
if (std::ranges::find(
filter_levels,
clp::enum_to_underlying_type(log_event.get_log_level())
)
!= filter_levels.end())
{
m_filtered_log_event_map->emplace_back(log_event_idx);
}
}
}

auto IRStreamReader::deserialize_stream() -> size_t {
if (nullptr == m_stream_reader_data_context) {
return m_encoded_log_events.size();
}

constexpr size_t cDefaultNumReservedLogEvents{500'000};
m_encoded_log_events.reserve(cDefaultNumReservedLogEvents);

while (true) {
auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()};
if (result.has_error()) {
auto const error{result.error()};
if (std::errc::no_message_available == error) {
break;
}
if (std::errc::result_out_of_range == error) {
SPDLOG_ERROR("File contains an incomplete IR stream");
break;
}
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Corrupt,
__FILENAME__,
__LINE__,
"Failed to deserialize: "s + error.category().name() + ":" + error.message()
};
}
auto const& log_event = result.value();
auto const& message = log_event.get_message();

auto const& logtype = message.get_logtype();
constexpr size_t cLogLevelPositionInMessages{1};
LogLevel log_level{LogLevel::NONE};
if (logtype.length() > cLogLevelPositionInMessages) {
// NOLINTNEXTLINE(readability-qualified-auto)
auto const log_level_name_it{std::find_if(
cLogLevelNames.begin() + static_cast<size_t>(cValidLogLevelsBeginIdx),
cLogLevelNames.end(),
[&](std::string_view level) {
return logtype.substr(cLogLevelPositionInMessages).starts_with(level);
}
)};
if (log_level_name_it != cLogLevelNames.end()) {
log_level = static_cast<LogLevel>(
std::distance(cLogLevelNames.begin(), log_level_name_it)
);
}
}

auto log_viewer_event{LogEventWithLevel<clp::ir::LogEvent<clp::ir::four_byte_encoded_variable_t>>(
std::move(log_event),
log_level
)};
m_encoded_log_events.emplace_back(std::move(log_viewer_event));
}
m_stream_reader_data_context.reset(nullptr);
return m_encoded_log_events.size();
}

auto IRStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType {
if (use_filter && false == m_filtered_log_event_map.has_value()) {
return DecodedResultsTsType{emscripten::val::null()};
}

size_t length{0};
if (use_filter) {
length = m_filtered_log_event_map->size();
} else {
length = m_encoded_log_events.size();
}
if (length < end_idx || begin_idx > end_idx) {
return DecodedResultsTsType{emscripten::val::null()};
}

std::string message;
constexpr size_t cDefaultReservedMessageLength{512};
message.reserve(cDefaultReservedMessageLength);
auto const results{emscripten::val::array()};

for (size_t i = begin_idx; i < end_idx; ++i) {
size_t log_event_idx{0};
if (use_filter) {
log_event_idx = m_filtered_log_event_map->at(i);
} else {
log_event_idx = i;
}
auto const& log_event_with_level{m_encoded_log_events[log_event_idx]};
auto const& log_event{log_event_with_level.get_log_event()};

auto const parsed{log_event.get_message().decode_and_unparse()};
if (false == parsed.has_value()) {
SPDLOG_ERROR("Failed to decode message.");
break;
}
message = parsed.value();

m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message);

EM_ASM(
{ Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); },
results.as_handle(),
message.c_str(),
log_event.get_timestamp(),
log_event_with_level.get_log_level(),
log_event_idx + 1
);
}

return DecodedResultsTsType(results);
}

IRStreamReader::IRStreamReader(
StreamReaderDataContext<clp::ir::LogEventDeserializer<clp::ir::four_byte_encoded_variable_t>>&& stream_reader_data_context, ReaderOptions const& reader_options
)
: m_stream_reader_data_context{std::make_unique<
StreamReaderDataContext<clp::ir::LogEventDeserializer<clp::ir::four_byte_encoded_variable_t>>>(
std::move(stream_reader_data_context)
)},
m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {}
} // namespace clp_ffi_js::ir
Loading