Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype: Add initial code for IRv2 decoder. #10

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
acd4563
Add initial code for IRv2 decoder.
junhaoliao Oct 1, 2024
d883966
Revert changes to enable exception catching linker flags - Update CMa…
junhaoliao Oct 8, 2024
3301fb6
Merge branch 'main' into irv2
junhaoliao Oct 8, 2024
322b60a
Switch clp to OSS' main.
junhaoliao Oct 8, 2024
ab37228
Add code to parse and validate the CLP IR version.
junhaoliao Oct 8, 2024
f2ed47e
Templatize StreamReaderDataContext.
junhaoliao Oct 8, 2024
9062027
Rename IRv2 specialized StreamReader -> KVPairIRStreamReader.
junhaoliao Oct 8, 2024
0f25962
Create a new StreamReader base class and refactor KVPairIRStreamReade…
junhaoliao Oct 10, 2024
80f059b
Revert KVPairIRStreamReader(StreamReaderDataContext<deserializer_t>&&…
junhaoliao Oct 10, 2024
9be08c4
Reformat code.
junhaoliao Oct 10, 2024
7842d29
Optimize imports.
junhaoliao Oct 10, 2024
541b84e
Merge branch 'main' into irv2
junhaoliao Oct 10, 2024
5b49ec0
Rename the original StreamReader -> IrStreamReader and adapt it to th…
junhaoliao Oct 10, 2024
ff7321b
Add log level filtering to KVPairIRStreamReader.
junhaoliao Oct 11, 2024
98b0b70
Rename IrStreamReader -> IRStreamReader.
junhaoliao Oct 11, 2024
f080ece
Update to latest CLP commit.
junhaoliao Nov 1, 2024
7c529d8
Upgrade clp submodule commit to 9f6a6ced4da504f6ba3c131efa26fd5b30c6f533
junhaoliao Nov 6, 2024
6c0ecde
Add tree node id parsing for timestampKey and logLevelKey.
junhaoliao Nov 6, 2024
40908f0
Complete log level filtering.
junhaoliao Nov 6, 2024
e411420
Update yscope-dev-utils version.
junhaoliao Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,22 @@ target_include_directories(

target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/)

set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp)
set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/KVPairIRStreamReader.cpp
src/clp_ffi_js/ir/StreamReader.cpp
)

set(CLP_FFI_JS_SRC_CLP_CORE
src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/Deserializer.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp
src/submodules/clp/components/core/src/clp/ffi/ir_stream/utils.cpp
src/submodules/clp/components/core/src/clp/ffi/KeyValuePairLogEvent.cpp
src/submodules/clp/components/core/src/clp/ffi/SchemaTree.cpp
src/submodules/clp/components/core/src/clp/ir/EncodedTextAst.cpp
src/submodules/clp/components/core/src/clp/ir/LogEventDeserializer.cpp
src/submodules/clp/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp
src/submodules/clp/components/core/src/clp/ReaderInterface.cpp
src/submodules/clp/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp
src/submodules/clp/components/core/src/clp/TimestampPattern.cpp
)

set(CLP_FFI_JS_SRC_FMT src/submodules/fmt/src/format.cc)
Expand Down
184 changes: 184 additions & 0 deletions src/clp_ffi_js/ir/KVPairIRStreamReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#include "KVPairIRStreamReader.hpp"

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <memory>
#include <span>
#include <string>
#include <string_view>
#include <system_error>
#include <utility>
#include <vector>

#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ir/LogEventDeserializer.hpp>
#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ir/types.hpp>
#include <clp/streaming_compression/zstd/Decompressor.hpp>
#include <clp/TraceableException.hpp>
#include <emscripten/em_asm.h>
#include <emscripten/val.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>
#include <clp_ffi_js/ir/StreamReader.hpp>

using namespace std::literals::string_literals;
using clp::ir::four_byte_encoded_variable_t;

namespace clp_ffi_js::ir {
auto KVPairIRStreamReader::create(DataArrayTsType const& data_array) -> KVPairIRStreamReader {
auto const length{data_array["length"].as<size_t>()};
SPDLOG_INFO("KVPairIRStreamReader::create: got buffer of length={}", length);

// Copy array from JavaScript to C++
clp::Array<char> data_buffer{length};
// NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast)
emscripten::val::module_property("HEAPU8")
.call<void>("set", data_array, reinterpret_cast<uintptr_t>(data_buffer.data()));
// NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast)

auto zstd_decompressor{std::make_unique<clp::streaming_compression::zstd::Decompressor>()};
zstd_decompressor->open(data_buffer.data(), length);

auto result{
clp::ffi::ir_stream::Deserializer::create(*zstd_decompressor)
};
if (result.has_error()) {
auto const error_code{result.error()};
SPDLOG_CRITICAL(
"Failed to create deserializer: {}:{}",
error_code.category().name(),
error_code.message()
);
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
"Failed to create deserializer"
};
}

StreamReaderDataContext stream_reader_data_context{
std::move(data_buffer),
std::move(zstd_decompressor),
std::move(result.value())
};
return KVPairIRStreamReader{std::move(stream_reader_data_context)};
}

auto KVPairIRStreamReader::get_num_events_buffered() const -> size_t {
return m_encoded_log_events.size();
}

auto KVPairIRStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType {
return FilteredLogEventMapTsType(emscripten::val::null());
}

auto KVPairIRStreamReader::filter_log_events(emscripten::val const &log_level_filter) -> void {

}

auto KVPairIRStreamReader::deserialize_stream() -> size_t {
if (nullptr != m_stream_reader_data_context) {
constexpr size_t cDefaultNumReservedLogEvents{500'000};
m_encoded_log_events.reserve(cDefaultNumReservedLogEvents);
auto &reader{m_stream_reader_data_context->get_reader()};
while (true) {
auto result{m_stream_reader_data_context->get_deserializer().deserialize_to_next_log_event(reader)};
if (false == result.has_error()) {
m_encoded_log_events.emplace_back(std::move(result.value()));
continue;
}
auto const error{result.error()};
if (std::errc::no_message_available == error) {
break;
}
if (std::errc::result_out_of_range == error) {
SPDLOG_ERROR("File contains an incomplete IR stream");
break;
}
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Corrupt,
__FILENAME__,
__LINE__,
"Failed to deserialize: "s + error.category().name() + ":" + error.message()
};
}
m_stream_reader_data_context.reset(nullptr);
}

return m_encoded_log_events.size();
}

auto KVPairIRStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType {
if (m_encoded_log_events.size() < end_idx || begin_idx >= end_idx) {
return DecodedResultsTsType(emscripten::val::null());
}

std::span const log_events_span{
m_encoded_log_events.begin()
+ static_cast<decltype(m_encoded_log_events)::difference_type>(begin_idx),
m_encoded_log_events.begin()
+ static_cast<decltype(m_encoded_log_events)::difference_type>(end_idx)
};
size_t log_num{begin_idx + 1};
auto const results{emscripten::val::array()};
for (auto const& log_event : log_events_span) {
auto const json{log_event.serialize_to_json()};
if (false == json.has_value()) {
SPDLOG_ERROR("Failed to decode message.");
break;
}

EM_ASM(
{ Emval.toValue($0).push([UTF8ToString($1), $2]); },
results.as_handle(),
json.value().dump().c_str(),
log_num
);
++log_num;
}

return DecodedResultsTsType(results);
}

KVPairIRStreamReader::KVPairIRStreamReader(
StreamReaderDataContext<deserializer_t>&& stream_reader_data_context
)
: m_stream_reader_data_context{std::make_unique<StreamReaderDataContext<deserializer_t>>(
std::move(stream_reader_data_context)
)} {}
} // namespace clp_ffi_js::ir

namespace {
EMSCRIPTEN_BINDINGS(ClpIrStreamReader) {
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
"Array<[string, number]>"
);
emscripten::class_<clp_ffi_js::ir::KVPairIRStreamReader, emscripten::base<clp_ffi_js::ir::StreamReader>>("ClpKVPairIRStreamReader")
.constructor(
&clp_ffi_js::ir::KVPairIRStreamReader::create,
emscripten::return_value_policy::take_ownership()
)
.function(
"getNumEventsBuffered",
&clp_ffi_js::ir::KVPairIRStreamReader::get_num_events_buffered
)
.function("deserializeStream", &clp_ffi_js::ir::KVPairIRStreamReader::deserialize_stream)
.function("decodeRange", &clp_ffi_js::ir::KVPairIRStreamReader::decode_range);

emscripten::class_<clp_ffi_js::ir::StreamReader>("ClpStreamReader")
.constructor(
&clp_ffi_js::ir::StreamReader::create,
emscripten::return_value_policy::take_ownership()
);
}
} // namespace
96 changes: 96 additions & 0 deletions src/clp_ffi_js/ir/KVPairIRStreamReader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#ifndef CLP_FFI_JS_KV_PAIR_IR_STREAM_READER_HPP
#define CLP_FFI_JS_KV_PAIR_IR_STREAM_READER_HPP

#include <cstddef>
#include <memory>
#include <vector>

#include <clp/ir/LogEvent.hpp>
#include <clp/ffi/KeyValuePairLogEvent.hpp>
#include <emscripten/bind.h>
#include <emscripten/val.h>

#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>
#include <clp_ffi_js/ir/StreamReader.hpp>

namespace clp_ffi_js::ir {
/**
* Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded
* log events.
*/
class KVPairIRStreamReader: public StreamReader {
public:
/**
* Creates a StreamReader to read from the given array.
*
* @param data_array An array containing a Zstandard-compressed IR stream.
* @return The created instance.
* @throw ClpFfiJsException if any error occurs.
*/
[[nodiscard]] static auto create(DataArrayTsType const& data_array) -> KVPairIRStreamReader;

// Destructor
~KVPairIRStreamReader() override = default;

// Disable copy constructor and assignment operator
KVPairIRStreamReader(KVPairIRStreamReader const&) = delete;
auto operator=(KVPairIRStreamReader const&) -> KVPairIRStreamReader& = delete;

// Define default move constructor
KVPairIRStreamReader(KVPairIRStreamReader&&) = default;
// Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`.
auto operator=(KVPairIRStreamReader&&) -> KVPairIRStreamReader& = delete;

/**
* @return The number of events buffered.
*/
[[nodiscard]] auto get_num_events_buffered() const -> size_t override;

[[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override;

auto filter_log_events(emscripten::val const &log_level_filter) -> void override;
/**
* Deserializes and buffers log events in the range `[beginIdx, endIdx)`. After the stream has
* been exhausted, it will be deallocated.
*
* NOTE: Currently, this class only supports deserializing the full range of log events in the
* stream.
*
* @param begin_idx
* @param end_idx
* @return The number of successfully deserialized ("valid") log events.
*/
[[nodiscard]] auto deserialize_stream() -> size_t override;

/**
* Decodes the deserialized log events in the range `[beginIdx, endIdx)`.
*
* @param begin_idx
* @param end_idx
* @return An array where each element is a decoded log event represented by an array of:
* - The log event's message
* - The log event's timestamp as milliseconds since the Unix epoch
* - The log event's log level as an integer that indexes into `cLogLevelNames`
* - The log event's number (1-indexed) in the stream
* @return null if any log event in the range doesn't exist (e.g., the range exceeds the number
* of log events in the file).
*/
[[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType override;


using deserializer_t = clp::ffi::ir_stream::Deserializer;

// Constructor
explicit KVPairIRStreamReader(StreamReaderDataContext<deserializer_t>&& stream_reader_data_context);

private:



// Variables
std::vector<clp::ffi::KeyValuePairLogEvent> m_encoded_log_events;
std::unique_ptr<StreamReaderDataContext<deserializer_t>> m_stream_reader_data_context;
};
} // namespace clp_ffi_js::ir

#endif // CLP_FFI_JS_KV_PAIR_IR_STREAM_READER_HPP
Loading
Loading