Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split StreamReader into an interface and implementation to prepare for adding another IR stream reader. #26

Merged
merged 42 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
3dae806
Refactor StreamReader to modularize decoding logic.
junhaoliao Oct 15, 2024
ab32045
Add a comment section for methods in StreamReader.hpp.
junhaoliao Oct 15, 2024
8dc3dff
Add `get_version` to decoding methods.
junhaoliao Oct 15, 2024
eba1d30
Move `#include <emscripten/bind.h>` from StreamReader.hpp to StreamRe…
junhaoliao Oct 15, 2024
5398d1d
Rename StreamReader -> IRStreamReader.
junhaoliao Oct 15, 2024
cfd55ca
Rename ClpIrStreamReader to ClpIRStreamReader in bindings.
junhaoliao Oct 15, 2024
d1ed2f0
Extract a StreamReader base class from IRStreamReader.
junhaoliao Oct 15, 2024
bf5e4c9
Rename IRStreamReader -> IrStreamReader.
junhaoliao Oct 15, 2024
ad306f2
merging main into junhao PR
davemarco Oct 31, 2024
c474bb4
some restructure
davemarco Nov 1, 2024
f4a6896
fix cmake
davemarco Nov 1, 2024
dba3910
more changes after test
davemarco Nov 1, 2024
9ddd974
junhao changes
davemarco Nov 4, 2024
a5cb529
fix lint
davemarco Nov 4, 2024
cd50c9a
version methodology change + better error handling
davemarco Nov 4, 2024
d2cf122
lint fix
davemarco Nov 4, 2024
7fe3a58
Update src/clp_ffi_js/ir/decoding_methods.hpp
davemarco Nov 5, 2024
9b67be9
fix print statement
davemarco Nov 5, 2024
fb2b290
fix lint
davemarco Nov 5, 2024
464cf31
forgot one comment, adding type for log level filter
davemarco Nov 5, 2024
6e8d025
Update src/clp_ffi_js/ir/StreamReader.hpp
davemarco Nov 5, 2024
19e41b9
Update src/clp_ffi_js/ir/StreamReader.cpp
davemarco Nov 5, 2024
4988949
Update src/clp_ffi_js/ir/decoding_methods.hpp
davemarco Nov 5, 2024
c79a83f
kirk review comments
davemarco Nov 5, 2024
ef0b86b
fix lint
davemarco Nov 5, 2024
d9e9e97
Apply suggestions from code review
davemarco Nov 6, 2024
efaeedd
kirk review
davemarco Nov 6, 2024
3764298
rename
davemarco Nov 6, 2024
c81cd4b
cleanup
davemarco Nov 6, 2024
28d4250
fix seek issue
davemarco Nov 6, 2024
c5db9a6
Update src/clp_ffi_js/ir/StreamReader.cpp
davemarco Nov 6, 2024
4a83bce
Update src/clp_ffi_js/ir/StreamReader.hpp
davemarco Nov 6, 2024
de7a2f8
Update src/clp_ffi_js/ir/StreamReader.hpp
davemarco Nov 6, 2024
1b31f08
Apply suggestions from code review
davemarco Nov 6, 2024
226ab3a
Update src/clp_ffi_js/ir/StreamReader.cpp
davemarco Nov 6, 2024
fa4debd
change to pass by value
davemarco Nov 6, 2024
4108398
kirk changes
davemarco Nov 6, 2024
c7a8fbf
change submodule
davemarco Nov 6, 2024
d5cad52
Merge branch 'main' into extractBaseClass
davemarco Nov 6, 2024
bfea301
Apply suggestions from code review
davemarco Nov 6, 2024
72bdf6e
revert submodule change
davemarco Nov 6, 2024
119de6c
use see
davemarco Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ target_include_directories(
target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/)

set(CLP_FFI_JS_SRC_MAIN
src/clp_ffi_js/ir/decoding_methods.cpp
src/clp_ffi_js/ir/StreamReader.cpp
src/clp_ffi_js/ir/UnstructuredIrStreamReader.cpp
)

set(CLP_FFI_JS_SRC_CLP_CORE
Expand Down
330 changes: 121 additions & 209 deletions src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,249 +3,122 @@
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <format>
#include <json/single_include/nlohmann/json.hpp>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add json to src/clp-ffi-js/.clang-format.

#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <system_error>
#include <type_traits>
#include <utility>
#include <vector>

#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ir/LogEventDeserializer.hpp>
#include <clp/ir/types.hpp>
#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <clp/ReaderInterface.hpp>
#include <clp/streaming_compression/zstd/Decompressor.hpp>
#include <clp/TraceableException.hpp>
#include <clp/type_utils.hpp>
#include <emscripten/em_asm.h>
#include <emscripten/val.h>
#include <emscripten/bind.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/decoding_methods.hpp>
#include <clp_ffi_js/ir/LogEventWithLevel.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>
#include <clp_ffi_js/ir/UnstructuredIrStreamReader.hpp>

using namespace std::literals::string_literals;
using clp::ir::four_byte_encoded_variable_t;

namespace clp_ffi_js::ir {
auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader {
auto const length{data_array["length"].as<size_t>()};
SPDLOG_INFO("StreamReader::create: got buffer of length={}", length);

// Copy array from JavaScript to C++
clp::Array<char> data_buffer{length};
// NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast)
emscripten::val::module_property("HEAPU8")
.call<void>("set", data_array, reinterpret_cast<uintptr_t>(data_buffer.data()));
// NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast)

auto zstd_decompressor{std::make_unique<clp::streaming_compression::zstd::Decompressor>()};
zstd_decompressor->open(data_buffer.data(), length);

auto stream_reader_data_context{
create_data_context(std::move(zstd_decompressor), std::move(data_buffer))
};
return StreamReader{std::move(stream_reader_data_context)};
}

auto StreamReader::get_num_events_buffered() const -> size_t {
return m_encoded_log_events.size();
}

auto StreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType {
if (false == m_filtered_log_event_map.has_value()) {
return FilteredLogEventMapTsType{emscripten::val::null()};
}

return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())};
}

void StreamReader::filter_log_events(emscripten::val const& log_level_filter) {
if (log_level_filter.isNull()) {
m_filtered_log_event_map.reset();
return;
}

m_filtered_log_event_map.emplace();
auto filter_levels{emscripten::vecFromJSArray<std::underlying_type_t<LogLevel>>(log_level_filter
)};
for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) {
auto const& log_event = m_encoded_log_events[log_event_idx];
if (std::ranges::find(
filter_levels,
clp::enum_to_underlying_type(log_event.get_log_level())
)
!= filter_levels.end())
{
m_filtered_log_event_map->emplace_back(log_event_idx);
}
}
}

auto StreamReader::deserialize_stream() -> size_t {
if (nullptr == m_stream_reader_data_context) {
return m_encoded_log_events.size();
}

constexpr size_t cDefaultNumReservedLogEvents{500'000};
m_encoded_log_events.reserve(cDefaultNumReservedLogEvents);

while (true) {
auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()};
if (result.has_error()) {
auto const error{result.error()};
if (std::errc::no_message_available == error) {
break;
}
if (std::errc::result_out_of_range == error) {
SPDLOG_ERROR("File contains an incomplete IR stream");
break;
}
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Corrupt,
__FILENAME__,
__LINE__,
"Failed to deserialize: "s + error.category().name() + ":" + error.message()
};
}
auto const& log_event = result.value();
auto const& message = log_event.get_message();

auto const& logtype = message.get_logtype();
constexpr size_t cLogLevelPositionInMessages{1};
LogLevel log_level{LogLevel::NONE};
if (logtype.length() > cLogLevelPositionInMessages) {
// NOLINTNEXTLINE(readability-qualified-auto)
auto const log_level_name_it{std::find_if(
cLogLevelNames.begin() + static_cast<size_t>(cValidLogLevelsBeginIdx),
cLogLevelNames.end(),
[&](std::string_view level) {
return logtype.substr(cLogLevelPositionInMessages).starts_with(level);
}
)};
if (log_level_name_it != cLogLevelNames.end()) {
log_level = static_cast<LogLevel>(
std::distance(cLogLevelNames.begin(), log_level_name_it)
);
}
}

auto log_viewer_event{LogEventWithLevel<four_byte_encoded_variable_t>(
log_event.get_timestamp(),
log_event.get_utc_offset(),
message,
log_level
)};
m_encoded_log_events.emplace_back(std::move(log_viewer_event));
}
m_stream_reader_data_context.reset(nullptr);
return m_encoded_log_events.size();
}

auto StreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType {
if (use_filter && false == m_filtered_log_event_map.has_value()) {
return DecodedResultsTsType{emscripten::val::null()};
}

size_t length{0};
if (use_filter) {
length = m_filtered_log_event_map->size();
} else {
length = m_encoded_log_events.size();
}
if (length < end_idx || begin_idx > end_idx) {
return DecodedResultsTsType{emscripten::val::null()};
namespace {
using ClpFfiJsException = clp_ffi_js::ClpFfiJsException;
using IRErrorCode = clp::ffi::ir_stream::IRErrorCode;

/**
* Rewinds the reader to the beginning then validates the CLP IR data encoding type.
* @param reader
* @throws ClpFfiJsException if the encoding type couldn't be decoded or the encoding type is
* unsupported.
*/
auto rewind_reader_and_validate_encoding_type(clp::ReaderInterface& reader) -> void {
reader.seek_from_begin(0);

bool is_four_bytes_encoding{true};
if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)};
IRErrorCode::IRErrorCode_Success != err)
{
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
std::format(
"Failed to decode encoding type: IR error code {}",
clp::enum_to_underlying_type(err)
)
};
}

std::string message;
constexpr size_t cDefaultReservedMessageLength{512};
message.reserve(cDefaultReservedMessageLength);
auto const results{emscripten::val::array()};

for (size_t i = begin_idx; i < end_idx; ++i) {
size_t log_event_idx{0};
if (use_filter) {
log_event_idx = m_filtered_log_event_map->at(i);
} else {
log_event_idx = i;
}
auto const& log_event{m_encoded_log_events[log_event_idx]};

auto const parsed{log_event.get_message().decode_and_unparse()};
if (false == parsed.has_value()) {
SPDLOG_ERROR("Failed to decode message.");
break;
}
message = parsed.value();

m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message);

EM_ASM(
{ Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); },
results.as_handle(),
message.c_str(),
log_event.get_timestamp(),
log_event.get_log_level(),
log_event_idx + 1
);
if (false == is_four_bytes_encoding) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
"IR stream uses unsupported encoding."
};
}

return DecodedResultsTsType(results);
}

StreamReader::StreamReader(
StreamReaderDataContext<four_byte_encoded_variable_t>&& stream_reader_data_context
)
: m_stream_reader_data_context{std::make_unique<
StreamReaderDataContext<four_byte_encoded_variable_t>>(
std::move(stream_reader_data_context)
)},
m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {}

auto StreamReader::create_data_context(
std::unique_ptr<clp::streaming_compression::zstd::Decompressor>&& zstd_decompressor,
clp::Array<char> data_buffer
) -> StreamReaderDataContext<four_byte_encoded_variable_t> {
rewind_reader_and_validate_encoding_type(*zstd_decompressor);

auto result{
clp::ir::LogEventDeserializer<four_byte_encoded_variable_t>::create(*zstd_decompressor)
/**
* Gets the version of the IR stream.
* @param reader
* @throws ClpFfiJsException if the preamble couldn't be deserialized.
* @return The IR stream's version.
*/
auto get_version(clp::ReaderInterface& reader) -> std::string {
// Deserialize metadata bytes from preamble.
clp::ffi::ir_stream::encoded_tag_t metadata_type{};
std::vector<int8_t> metadata_bytes;
auto const err{clp::ffi::ir_stream::deserialize_preamble(reader, metadata_type, metadata_bytes)
};
if (result.has_error()) {
auto const error_code{result.error()};
SPDLOG_CRITICAL(
"Failed to create deserializer: {}:{}",
error_code.category().name(),
error_code.message()
);
if (IRErrorCode::IRErrorCode_Success != err) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
"Failed to create deserializer"
std::format(
"Failed to deserialize preamble: IR error code {}",
clp::enum_to_underlying_type(err)
)
};
}

return {std::move(data_buffer), std::move(zstd_decompressor), std::move(result.value())};
std::string version;
try {
// Deserialize metadata bytes as JSON.
std::string_view const metadata_view{
clp::size_checked_pointer_cast<char const>(metadata_bytes.data()),
metadata_bytes.size()
};
nlohmann::json const metadata = nlohmann::json::parse(metadata_view);
version = metadata.at(clp::ffi::ir_stream::cProtocol::Metadata::VersionKey);
} catch (nlohmann::json::exception const& e) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
std::format("Failed to parse stream's metadata: {}", e.what())
};
}

SPDLOG_INFO("IR version is {}", version);
return version;
Comment on lines +76 to +113
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add metadata type validation.

The function deserializes the preamble but doesn't validate if the metadata_type is correct before attempting to parse it as JSON.

Add validation like this:

     if (IRErrorCode::IRErrorCode_Success != err) {
         throw ClpFfiJsException{
                 clp::ErrorCode::ErrorCode_Failure,
                 __FILENAME__,
                 __LINE__,
                 std::format(
                         "Failed to deserialize preamble: IR error code {}",
                         clp::enum_to_underlying_type(err)
                 )
         };
     }
+    if (metadata_type != clp::ffi::ir_stream::cProtocol::Tags::Metadata) {
+        throw ClpFfiJsException{
+                clp::ErrorCode::ErrorCode_MetadataCorrupted,
+                __FILENAME__,
+                __LINE__,
+                std::format("Invalid metadata type: {}", metadata_type)
+        };
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
auto get_version(clp::ReaderInterface& reader) -> std::string {
// Deserialize metadata bytes from preamble.
clp::ffi::ir_stream::encoded_tag_t metadata_type{};
std::vector<int8_t> metadata_bytes;
auto const err{clp::ffi::ir_stream::deserialize_preamble(reader, metadata_type, metadata_bytes)
};
if (result.has_error()) {
auto const error_code{result.error()};
SPDLOG_CRITICAL(
"Failed to create deserializer: {}:{}",
error_code.category().name(),
error_code.message()
);
if (IRErrorCode::IRErrorCode_Success != err) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
"Failed to create deserializer"
std::format(
"Failed to deserialize preamble: IR error code {}",
clp::enum_to_underlying_type(err)
)
};
}
return {std::move(data_buffer), std::move(zstd_decompressor), std::move(result.value())};
std::string version;
try {
// Deserialize metadata bytes as JSON.
std::string_view const metadata_view{
clp::size_checked_pointer_cast<char const>(metadata_bytes.data()),
metadata_bytes.size()
};
nlohmann::json const metadata = nlohmann::json::parse(metadata_view);
version = metadata.at(clp::ffi::ir_stream::cProtocol::Metadata::VersionKey);
} catch (nlohmann::json::exception const& e) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
std::format("Failed to parse stream's metadata: {}", e.what())
};
}
SPDLOG_INFO("IR version is {}", version);
return version;
auto get_version(clp::ReaderInterface& reader) -> std::string {
// Deserialize metadata bytes from preamble.
clp::ffi::ir_stream::encoded_tag_t metadata_type{};
std::vector<int8_t> metadata_bytes;
auto const err{clp::ffi::ir_stream::deserialize_preamble(reader, metadata_type, metadata_bytes)
};
if (IRErrorCode::IRErrorCode_Success != err) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format(
"Failed to deserialize preamble: IR error code {}",
clp::enum_to_underlying_type(err)
)
};
}
if (metadata_type != clp::ffi::ir_stream::cProtocol::Tags::Metadata) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
std::format("Invalid metadata type: {}", metadata_type)
};
}
std::string version;
try {
// Deserialize metadata bytes as JSON.
std::string_view const metadata_view{
clp::size_checked_pointer_cast<char const>(metadata_bytes.data()),
metadata_bytes.size()
};
nlohmann::json const metadata = nlohmann::json::parse(metadata_view);
version = metadata.at(clp::ffi::ir_stream::cProtocol::Metadata::VersionKey);
} catch (nlohmann::json::exception const& e) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_MetadataCorrupted,
__FILENAME__,
__LINE__,
std::format("Failed to parse stream's metadata: {}", e.what())
};
}
SPDLOG_INFO("IR version is {}", version);
return version;

}
} // namespace clp_ffi_js::ir

namespace {
EMSCRIPTEN_BINDINGS(ClpIrStreamReader) {
EMSCRIPTEN_BINDINGS(ClpStreamReader) {
// JS types used as inputs
emscripten::register_type<clp_ffi_js::ir::DataArrayTsType>("Uint8Array");
emscripten::register_type<clp_ffi_js::ir::LogLevelFilterTsType>("number[] | null");

// JS types used as outputs
emscripten::register_type<clp_ffi_js::ir::DecodedResultsTsType>(
"Array<[string, number, number, number]>"
);
emscripten::register_type<clp_ffi_js::ir::FilteredLogEventMapTsType>("number[] | null");

emscripten::class_<clp_ffi_js::ir::StreamReader>("ClpIrStreamReader")
emscripten::class_<clp_ffi_js::ir::StreamReader>("ClpStreamReader")
.constructor(
&clp_ffi_js::ir::StreamReader::create,
emscripten::return_value_policy::take_ownership()
Expand All @@ -263,3 +136,42 @@ EMSCRIPTEN_BINDINGS(ClpIrStreamReader) {
.function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range);
}
} // namespace

namespace clp_ffi_js::ir {

davemarco marked this conversation as resolved.
Show resolved Hide resolved
auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr<StreamReader> {
auto const length{data_array["length"].as<size_t>()};
SPDLOG_INFO("StreamReader::create: got buffer of length={}", length);

// Copy array from JavaScript to C++.
clp::Array<char> data_buffer{length};
// NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast)
emscripten::val::module_property("HEAPU8")
.call<void>("set", data_array, reinterpret_cast<uintptr_t>(data_buffer.data()));
// NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast)

auto zstd_decompressor{std::make_unique<ZstdDecompressor>()};
zstd_decompressor->open(data_buffer.data(), length);

// Required to validate encoding type prior to getting version.
rewind_reader_and_validate_encoding_type(*zstd_decompressor);
auto const version{get_version(*zstd_decompressor)};

// Required that reader offset matches position after validation in order to decode log events.
rewind_reader_and_validate_encoding_type(*zstd_decompressor);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize stream validation flow.

The encoding type is validated twice unnecessarily. As suggested in previous reviews, we can optimize this by:

  1. Rewind + validate encoding type once
  2. Save position
  3. Validate version
  4. Restore position
  5. Create and return the reader

Apply this restructuring:

-    // Required to validate encoding type prior to getting version.
-    rewind_reader_and_validate_encoding_type(*zstd_decompressor);
+    rewind_reader_and_validate_encoding_type(*zstd_decompressor);
+    
+    // Save position after encoding validation
+    auto pos = zstd_decompressor->get_pos();
+    
+    // Get and validate version
     auto const version{get_version(*zstd_decompressor)};
-
-    // Required that reader offset matches position after validation in order to decode log events.
-    rewind_reader_and_validate_encoding_type(*zstd_decompressor);
+    
+    // Restore position for decoding
+    try {
+        zstd_decompressor->seek_from_begin(pos);
+    } catch (ZstdDecompressor::OperationFailed& e) {
+        throw ClpFfiJsException{
+                clp::ErrorCode::ErrorCode_Failure,
+                __FILENAME__,
+                __LINE__,
+                std::format("Unable to rewind zstd decompressor: {}", e.what())
+        };
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Required to validate encoding type prior to getting version.
rewind_reader_and_validate_encoding_type(*zstd_decompressor);
auto const version{get_version(*zstd_decompressor)};
// Required that reader offset matches position after validation in order to decode log events.
rewind_reader_and_validate_encoding_type(*zstd_decompressor);
rewind_reader_and_validate_encoding_type(*zstd_decompressor);
// Save position after encoding validation
auto pos = zstd_decompressor->get_pos();
// Get and validate version
auto const version{get_version(*zstd_decompressor)};
// Restore position for decoding
try {
zstd_decompressor->seek_from_begin(pos);
} catch (ZstdDecompressor::OperationFailed& e) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format("Unable to rewind zstd decompressor: {}", e.what())
};
}

if (std::ranges::find(cUnstructuredIrVersions, version) != cUnstructuredIrVersions.end()) {
return std::make_unique<UnstructuredIrStreamReader>(UnstructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer)
));
}
SPDLOG_INFO("did i get here 3");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this here by accident. I will remove it. Just dont want to commit if Kirk is reviewing


throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
std::format("Unable to create reader for IR stream with version {}.", version)
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Required to validate encoding type prior to getting version.
rewind_reader_and_validate_encoding_type(*zstd_decompressor);
auto const version{get_version(*zstd_decompressor)};
// Required that reader offset matches position after validation in order to decode log events.
rewind_reader_and_validate_encoding_type(*zstd_decompressor);
if (std::ranges::find(cUnstructuredIrVersions, version) != cUnstructuredIrVersions.end()) {
return std::make_unique<UnstructuredIrStreamReader>(UnstructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer)
));
}
SPDLOG_INFO("did i get here 3");
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
std::format("Unable to create reader for IR stream with version {}.", version)
};
rewind_reader_and_validate_encoding_type(*zstd_decompressor);
// Validate the stream's version
auto pos = zstd_decompressor->get_pos();
auto const version{get_version(*zstd_decompressor)};
if (std::ranges::find(cUnstructuredIrVersions, version) == cUnstructuredIrVersions.end()) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Unsupported,
__FILENAME__,
__LINE__,
std::format("Unable to create reader for IR stream with version {}.", version)
};
}
try {
zstd_decompressor->seek_from_begin(pos);
} catch (ZstdDecompressor::OperationFailed& e) {
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format("Unable to rewind zstd decompressor: {}", e.what())
};
}
return std::make_unique<UnstructuredIrStreamReader>(
UnstructuredIrStreamReader::create(std::move(zstd_decompressor), std::move(data_buffer))
);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does a few things:

  • Refactors the code structure a bit in terms of comments and flow
    • Note that we handle the error path in the if block rather than the previous code which handled the success path in the if block
  • Updated the flow to avoid re-validating the encoding type twice:
    • Rewind + validate encoding type
    • Save position
    • Validate version
    • Restore position
    • Create and return the reader

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better for you to copy and commit this manually rather than applying the change from GH, just so that you can double-check what I did.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is did you test it... I tried something similiar but it did not work. I will test it for you and let u know

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, hence why I wanted you to double-check, lol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay it seems to work now. I dont know why mine was giving an error, but wtv i guess.

}
} // namespace clp_ffi_js::ir
Loading
Loading