-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split StreamReader
into an interface and implementation to prepare for adding another IR stream reader.
#26
Changes from 25 commits
3dae806
ab32045
8dc3dff
eba1d30
5398d1d
cfd55ca
d1ed2f0
bf5e4c9
ad306f2
c474bb4
f4a6896
dba3910
9ddd974
a5cb529
cd50c9a
d2cf122
7fe3a58
9b67be9
fb2b290
464cf31
6e8d025
19e41b9
4988949
c79a83f
ef0b86b
d9e9e97
efaeedd
3764298
c81cd4b
28d4250
c5db9a6
4a83bce
de7a2f8
1b31f08
226ab3a
fa4debd
4108398
c7a8fbf
d5cad52
bfea301
72bdf6e
119de6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,224 @@ | ||||||||||||||||||||||||||||||||||||||||
#include "IrStreamReader.hpp" | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
#include <algorithm> | ||||||||||||||||||||||||||||||||||||||||
#include <cstddef> | ||||||||||||||||||||||||||||||||||||||||
#include <iterator> | ||||||||||||||||||||||||||||||||||||||||
#include <memory> | ||||||||||||||||||||||||||||||||||||||||
#include <optional> | ||||||||||||||||||||||||||||||||||||||||
#include <string> | ||||||||||||||||||||||||||||||||||||||||
#include <string_view> | ||||||||||||||||||||||||||||||||||||||||
#include <system_error> | ||||||||||||||||||||||||||||||||||||||||
#include <type_traits> | ||||||||||||||||||||||||||||||||||||||||
#include <utility> | ||||||||||||||||||||||||||||||||||||||||
#include <vector> | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
#include <clp/Array.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp/ErrorCode.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp/ir/LogEventDeserializer.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp/ir/types.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp/streaming_compression/zstd/Decompressor.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp/TraceableException.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp/type_utils.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <emscripten/bind.h> | ||||||||||||||||||||||||||||||||||||||||
#include <emscripten/em_asm.h> | ||||||||||||||||||||||||||||||||||||||||
#include <emscripten/val.h> | ||||||||||||||||||||||||||||||||||||||||
#include <spdlog/spdlog.h> | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
#include <clp_ffi_js/ClpFfiJsException.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp_ffi_js/constants.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp_ffi_js/ir/decoding_methods.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp_ffi_js/ir/LogEventWithLevel.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp_ffi_js/ir/StreamReader.hpp> | ||||||||||||||||||||||||||||||||||||||||
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp> | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
namespace clp_ffi_js::ir { | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
using namespace std::literals::string_literals; | ||||||||||||||||||||||||||||||||||||||||
using clp::ir::four_byte_encoded_variable_t; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
auto IrStreamReader::get_num_events_buffered() const -> size_t { | ||||||||||||||||||||||||||||||||||||||||
return m_encoded_log_events.size(); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
auto IrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { | ||||||||||||||||||||||||||||||||||||||||
if (false == m_filtered_log_event_map.has_value()) { | ||||||||||||||||||||||||||||||||||||||||
return FilteredLogEventMapTsType{emscripten::val::null()}; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
void IrStreamReader::filter_log_events(LogLevelFilterTsType const& log_level_filter) { | ||||||||||||||||||||||||||||||||||||||||
if (log_level_filter.isNull()) { | ||||||||||||||||||||||||||||||||||||||||
m_filtered_log_event_map.reset(); | ||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
m_filtered_log_event_map.emplace(); | ||||||||||||||||||||||||||||||||||||||||
auto filter_levels{emscripten::vecFromJSArray<std::underlying_type_t<LogLevel>>(log_level_filter | ||||||||||||||||||||||||||||||||||||||||
)}; | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling for invalid Currently, the function assumes that Apply this diff to enhance input validation: void IrStreamReader::filter_log_events(emscripten::val const& log_level_filter) {
if (log_level_filter.isNull()) {
m_filtered_log_event_map.reset();
return;
}
+ if (!log_level_filter.isArray()) {
+ throw ClpFfiJsException{
+ clp::ErrorCode::ErrorCode_BadParam,
+ __FILENAME__,
+ __LINE__,
+ "Expected an array for log_level_filter"
+ };
+ }
m_filtered_log_event_map.emplace();
auto filter_levels{emscripten::vecFromJSArray<std::underlying_type_t<LogLevel>>(log_level_filter
)};
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think for now this is skippable since the wrapper code is typed with TypeScript and any usage with non-array would fail static checks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @junhaoliao, understood. Since the wrapper code is typed with TypeScript and non-array inputs would fail static checks, additional error handling here is not necessary. ✏️ Learnings added
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually sorry for my oversight, I think we want to define a type (similar to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @junhaoliao, thank you for the clarification. Defining a specific type for using LogLevelFilterTsType = emscripten::val; // Define a type representing the log level filter
void IrStreamReader::filter_log_events(LogLevelFilterTsType const& log_level_filter) {
// Implementation remains the same
} This approach makes the expected input type explicit in the method signature and aligns with the existing type definitions. Since this updates our previous understanding and may contradict the earlier learning (ID: 940b05fa-811e-47b0-98a2-bfbbe962ccf3) regarding the unnecessary runtime error handling for 🧠 Learnings used
|
||||||||||||||||||||||||||||||||||||||||
for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) { | ||||||||||||||||||||||||||||||||||||||||
auto const& log_event = m_encoded_log_events[log_event_idx]; | ||||||||||||||||||||||||||||||||||||||||
if (std::ranges::find( | ||||||||||||||||||||||||||||||||||||||||
filter_levels, | ||||||||||||||||||||||||||||||||||||||||
clp::enum_to_underlying_type(log_event.get_log_level()) | ||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||
!= filter_levels.end()) | ||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
m_filtered_log_event_map->emplace_back(log_event_idx); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
auto IrStreamReader::deserialize_stream() -> size_t { | ||||||||||||||||||||||||||||||||||||||||
if (nullptr == m_stream_reader_data_context) { | ||||||||||||||||||||||||||||||||||||||||
return m_encoded_log_events.size(); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
constexpr size_t cDefaultNumReservedLogEvents{500'000}; | ||||||||||||||||||||||||||||||||||||||||
m_encoded_log_events.reserve(cDefaultNumReservedLogEvents); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
while (true) { | ||||||||||||||||||||||||||||||||||||||||
auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()}; | ||||||||||||||||||||||||||||||||||||||||
if (result.has_error()) { | ||||||||||||||||||||||||||||||||||||||||
auto const error{result.error()}; | ||||||||||||||||||||||||||||||||||||||||
if (std::errc::no_message_available == error) { | ||||||||||||||||||||||||||||||||||||||||
break; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
if (std::errc::result_out_of_range == error) { | ||||||||||||||||||||||||||||||||||||||||
SPDLOG_ERROR("File contains an incomplete IR stream"); | ||||||||||||||||||||||||||||||||||||||||
break; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uniform error handling upon deserialization failure When encountering Here's a suggested change: if (std::errc::result_out_of_range == error) {
SPDLOG_ERROR("File contains an incomplete IR stream");
- break;
+ throw ClpFfiJsException{
+ clp::ErrorCode::ErrorCode_Corrupt,
+ __FILENAME__,
+ __LINE__,
+ "File contains an incomplete IR stream"
+ };
} 📝 Committable suggestion
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should not throw an exception here because we intend to handle missing End-of-Stream packet gracefully. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @junhaoliao, understood. Thank you for the clarification. ✏️ Learnings added
🧠 Learnings used
|
||||||||||||||||||||||||||||||||||||||||
throw ClpFfiJsException{ | ||||||||||||||||||||||||||||||||||||||||
clp::ErrorCode::ErrorCode_Corrupt, | ||||||||||||||||||||||||||||||||||||||||
__FILENAME__, | ||||||||||||||||||||||||||||||||||||||||
__LINE__, | ||||||||||||||||||||||||||||||||||||||||
"Failed to deserialize: "s + error.category().name() + ":" + error.message() | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
auto const& log_event = result.value(); | ||||||||||||||||||||||||||||||||||||||||
auto const& message = log_event.get_message(); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
auto const& logtype = message.get_logtype(); | ||||||||||||||||||||||||||||||||||||||||
constexpr size_t cLogLevelPositionInMessages{1}; | ||||||||||||||||||||||||||||||||||||||||
LogLevel log_level{LogLevel::NONE}; | ||||||||||||||||||||||||||||||||||||||||
if (logtype.length() > cLogLevelPositionInMessages) { | ||||||||||||||||||||||||||||||||||||||||
// NOLINTNEXTLINE(readability-qualified-auto) | ||||||||||||||||||||||||||||||||||||||||
auto const log_level_name_it{std::find_if( | ||||||||||||||||||||||||||||||||||||||||
cLogLevelNames.begin() + static_cast<size_t>(cValidLogLevelsBeginIdx), | ||||||||||||||||||||||||||||||||||||||||
cLogLevelNames.end(), | ||||||||||||||||||||||||||||||||||||||||
[&](std::string_view level) { | ||||||||||||||||||||||||||||||||||||||||
return logtype.substr(cLogLevelPositionInMessages).starts_with(level); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
)}; | ||||||||||||||||||||||||||||||||||||||||
if (log_level_name_it != cLogLevelNames.end()) { | ||||||||||||||||||||||||||||||||||||||||
log_level = static_cast<LogLevel>( | ||||||||||||||||||||||||||||||||||||||||
std::distance(cLogLevelNames.begin(), log_level_name_it) | ||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
auto log_viewer_event{LogEventWithLevel<four_byte_encoded_variable_t>( | ||||||||||||||||||||||||||||||||||||||||
log_event.get_timestamp(), | ||||||||||||||||||||||||||||||||||||||||
log_event.get_utc_offset(), | ||||||||||||||||||||||||||||||||||||||||
message, | ||||||||||||||||||||||||||||||||||||||||
log_level | ||||||||||||||||||||||||||||||||||||||||
)}; | ||||||||||||||||||||||||||||||||||||||||
m_encoded_log_events.emplace_back(std::move(log_viewer_event)); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
m_stream_reader_data_context.reset(nullptr); | ||||||||||||||||||||||||||||||||||||||||
return m_encoded_log_events.size(); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
auto IrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const | ||||||||||||||||||||||||||||||||||||||||
-> DecodedResultsTsType { | ||||||||||||||||||||||||||||||||||||||||
if (use_filter && false == m_filtered_log_event_map.has_value()) { | ||||||||||||||||||||||||||||||||||||||||
return DecodedResultsTsType{emscripten::val::null()}; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
size_t length{0}; | ||||||||||||||||||||||||||||||||||||||||
if (use_filter) { | ||||||||||||||||||||||||||||||||||||||||
length = m_filtered_log_event_map->size(); | ||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||
length = m_encoded_log_events.size(); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
if (length < end_idx || begin_idx > end_idx) { | ||||||||||||||||||||||||||||||||||||||||
return DecodedResultsTsType{emscripten::val::null()}; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
std::string message; | ||||||||||||||||||||||||||||||||||||||||
constexpr size_t cDefaultReservedMessageLength{512}; | ||||||||||||||||||||||||||||||||||||||||
message.reserve(cDefaultReservedMessageLength); | ||||||||||||||||||||||||||||||||||||||||
auto const results{emscripten::val::array()}; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
for (size_t i = begin_idx; i < end_idx; ++i) { | ||||||||||||||||||||||||||||||||||||||||
size_t log_event_idx{0}; | ||||||||||||||||||||||||||||||||||||||||
if (use_filter) { | ||||||||||||||||||||||||||||||||||||||||
log_event_idx = m_filtered_log_event_map->at(i); | ||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||
log_event_idx = i; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
auto const& log_event{m_encoded_log_events[log_event_idx]}; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
auto const parsed{log_event.get_message().decode_and_unparse()}; | ||||||||||||||||||||||||||||||||||||||||
if (false == parsed.has_value()) { | ||||||||||||||||||||||||||||||||||||||||
SPDLOG_ERROR("Failed to decode message."); | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Necessary since we have the exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Necessary since we have the exception? |
||||||||||||||||||||||||||||||||||||||||
throw ClpFfiJsException{ | ||||||||||||||||||||||||||||||||||||||||
clp::ErrorCode::ErrorCode_Failure, | ||||||||||||||||||||||||||||||||||||||||
__FILENAME__, | ||||||||||||||||||||||||||||||||||||||||
__LINE__, | ||||||||||||||||||||||||||||||||||||||||
"Failed to decode message" | ||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
message = parsed.value(); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
EM_ASM( | ||||||||||||||||||||||||||||||||||||||||
{ Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, | ||||||||||||||||||||||||||||||||||||||||
results.as_handle(), | ||||||||||||||||||||||||||||||||||||||||
message.c_str(), | ||||||||||||||||||||||||||||||||||||||||
log_event.get_timestamp(), | ||||||||||||||||||||||||||||||||||||||||
log_event.get_log_level(), | ||||||||||||||||||||||||||||||||||||||||
log_event_idx + 1 | ||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
return DecodedResultsTsType(results); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
IrStreamReader::IrStreamReader( | ||||||||||||||||||||||||||||||||||||||||
StreamReaderDataContext<four_byte_encoded_variable_t>&& stream_reader_data_context | ||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||
: m_stream_reader_data_context{std::make_unique< | ||||||||||||||||||||||||||||||||||||||||
StreamReaderDataContext<four_byte_encoded_variable_t>>( | ||||||||||||||||||||||||||||||||||||||||
std::move(stream_reader_data_context) | ||||||||||||||||||||||||||||||||||||||||
)}, | ||||||||||||||||||||||||||||||||||||||||
m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
auto IrStreamReader::create_data_context( | ||||||||||||||||||||||||||||||||||||||||
std::unique_ptr<clp::streaming_compression::zstd::Decompressor>&& zstd_decompressor, | ||||||||||||||||||||||||||||||||||||||||
clp::Array<char>&& data_buffer | ||||||||||||||||||||||||||||||||||||||||
) -> StreamReaderDataContext<four_byte_encoded_variable_t> { | ||||||||||||||||||||||||||||||||||||||||
rewind_reader_and_validate_encoding_type(*zstd_decompressor); | ||||||||||||||||||||||||||||||||||||||||
auto result{ | ||||||||||||||||||||||||||||||||||||||||
clp::ir::LogEventDeserializer<four_byte_encoded_variable_t>::create(*zstd_decompressor) | ||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||
if (result.has_error()) { | ||||||||||||||||||||||||||||||||||||||||
auto const error_code{result.error()}; | ||||||||||||||||||||||||||||||||||||||||
SPDLOG_CRITICAL( | ||||||||||||||||||||||||||||||||||||||||
"Failed to create deserializer: {}:{}", | ||||||||||||||||||||||||||||||||||||||||
error_code.category().name(), | ||||||||||||||||||||||||||||||||||||||||
error_code.message() | ||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Merge this log into the the exception? |
||||||||||||||||||||||||||||||||||||||||
throw ClpFfiJsException{ | ||||||||||||||||||||||||||||||||||||||||
clp::ErrorCode::ErrorCode_Failure, | ||||||||||||||||||||||||||||||||||||||||
__FILENAME__, | ||||||||||||||||||||||||||||||||||||||||
__LINE__, | ||||||||||||||||||||||||||||||||||||||||
"Failed to create deserializer" | ||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
return {std::move(data_buffer), std::move(zstd_decompressor), std::move(result.value())}; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} // namespace clp_ffi_js::ir |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
#ifndef CLP_FFI_JS_IR_IR_STREAM_READER_HPP | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add a linter for this soon, but the format of header defines should be |
||
#define CLP_FFI_JS_IR_IR_STREAM_READER_HPP | ||
|
||
#include <Array.hpp> | ||
#include <cstddef> | ||
#include <memory> | ||
#include <optional> | ||
#include <streaming_compression/zstd/Decompressor.hpp> | ||
#include <vector> | ||
|
||
#include <clp/ir/types.hpp> | ||
#include <clp/TimestampPattern.hpp> | ||
#include <emscripten/val.h> | ||
|
||
#include <clp_ffi_js/ir/LogEventWithLevel.hpp> | ||
#include <clp_ffi_js/ir/StreamReader.hpp> | ||
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp> | ||
|
||
namespace clp_ffi_js::ir { | ||
using clp::ir::four_byte_encoded_variable_t; | ||
|
||
/** | ||
* Mapping between an index in the filtered log events collection to an index in the unfiltered | ||
* log events collection. | ||
*/ | ||
using FilteredLogEventsMap = std::optional<std::vector<size_t>>; | ||
|
||
/** | ||
* Class to deserialize and decode Zstandard-compressed CLP IRv1 streams as well as format decoded | ||
* log events. | ||
davemarco marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*/ | ||
class IrStreamReader : public StreamReader { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about calling this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about inserting
I see. I guess we can keep
It's how we differentiate unstructured and structured for the casual user. "Text" for unstructured and "JSON" for structured. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Anyways i changed for now. I think this ffi is not for the layman |
||
friend StreamReader; | ||
|
||
public: | ||
// Destructor | ||
~IrStreamReader() override = default; | ||
|
||
// Disable copy constructor and assignment operator | ||
IrStreamReader(IrStreamReader const&) = delete; | ||
auto operator=(IrStreamReader const&) -> IrStreamReader& = delete; | ||
|
||
// Define default move constructor | ||
IrStreamReader(IrStreamReader&&) = default; | ||
// Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. | ||
auto operator=(IrStreamReader&&) -> IrStreamReader& = delete; | ||
davemarco marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Methods | ||
/** | ||
* @return The number of events buffered. | ||
*/ | ||
davemarco marked this conversation as resolved.
Show resolved
Hide resolved
|
||
[[nodiscard]] auto get_num_events_buffered() const -> size_t override; | ||
|
||
/** | ||
* @return The filtered log events map. | ||
*/ | ||
davemarco marked this conversation as resolved.
Show resolved
Hide resolved
|
||
[[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override; | ||
|
||
/** | ||
* Generates a filtered collection from all log events. | ||
* | ||
* @param log_level_filter Array of selected log levels | ||
*/ | ||
davemarco marked this conversation as resolved.
Show resolved
Hide resolved
|
||
void filter_log_events(LogLevelFilterTsType const& log_level_filter) override; | ||
|
||
/** | ||
* Deserializes all log events in the stream. After the stream has been exhausted, it will be | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deallocation seems like an implementation decision, so perhaps we can keep this note in the implementation's docstring but not in the interface's. @junhaoliao, what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now I removed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, I see you've deleted this docstring. I'm fine keeping it this way, but if @junhaoliao wants to keep the part about deallocating the stream after exhaustion, then we need to add the docstring back in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (just to be sure I understand the discussion -
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. One way to do this is: /**
* Deserializes all log events in the stream. After the stream has been exhausted, it will be
* deallocated.
*
* @return The number of successfully deserialized ("valid") log events.
*/ Another way is: /**
* See `StreamReader::deserialize_stream`.
*
* After the stream has been exhausted, it will be deallocated.
*
* @return See `StreamReader::deserialize_stream`.
*/ We usually go with the first approach, but we don't have too many instances of this case, so we could change conventions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can go with the second approach to avoid repetition and potentially docs going out of sync any time in the future. For the second approach, can we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't know Doxygen had see. Yeah, let's use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @davemarco, can you make the change? |
||
* deallocated. | ||
* | ||
* @return The number of successfully deserialized ("valid") log events. | ||
*/ | ||
[[nodiscard]] auto deserialize_stream() -> size_t override; | ||
|
||
/** | ||
* Decodes log events in the range `[beginIdx, endIdx)` of the filtered or unfiltered | ||
* (depending on the value of `useFilter`) log events collection. | ||
* | ||
* @param begin_idx | ||
* @param end_idx | ||
* @param use_filter Whether to decode from the filtered or unfiltered log events collection. | ||
* @return An array where each element is a decoded log event represented by an array of: | ||
* - The log event's message | ||
* - The log event's timestamp as milliseconds since the Unix epoch | ||
* - The log event's log level as an integer that indexes into `cLogLevelNames` | ||
* - The log event's number (1-indexed) in the stream | ||
* @return null if any log event in the range doesn't exist (e.g. the range exceeds the number | ||
* of log events in the collection). | ||
*/ | ||
davemarco marked this conversation as resolved.
Show resolved
Hide resolved
|
||
[[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const | ||
-> DecodedResultsTsType override; | ||
|
||
private: | ||
// Constructor | ||
IrStreamReader( | ||
davemarco marked this conversation as resolved.
Show resolved
Hide resolved
|
||
StreamReaderDataContext<four_byte_encoded_variable_t>&& stream_reader_data_context | ||
); | ||
|
||
// Methods | ||
[[nodiscard]] static auto create_data_context( | ||
std::unique_ptr<clp::streaming_compression::zstd::Decompressor>&& zstd_decompressor, | ||
clp::Array<char>&& data_buffer | ||
) -> StreamReaderDataContext<four_byte_encoded_variable_t>; | ||
|
||
// Variables | ||
std::vector<LogEventWithLevel<four_byte_encoded_variable_t>> m_encoded_log_events; | ||
std::unique_ptr<StreamReaderDataContext<four_byte_encoded_variable_t>> | ||
m_stream_reader_data_context; | ||
FilteredLogEventsMap m_filtered_log_event_map; | ||
clp::TimestampPattern m_ts_pattern; | ||
}; | ||
} // namespace clp_ffi_js::ir | ||
|
||
#endif // CLP_FFI_JS_IR_IR_STREAM_READER_HPP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides renaming the class from
StreamReader
toIrStreamReader
, did we make any other changes to this file?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed
create()
since we have effectively moved this to base class. I removed encripten bindings and put them all in base class.