From edda4b38132bc96b00519b92f1abc07d60e054d2 Mon Sep 17 00:00:00 2001 From: Junhao Liao <junhao@junhao.ca> Date: Thu, 7 Nov 2024 00:56:19 -0500 Subject: [PATCH] Use version checking utility from CLP FFI. --- src/clp_ffi_js/ir/StreamReader.cpp | 61 +++++++++++++----------------- src/clp_ffi_js/ir/StreamReader.hpp | 5 --- 2 files changed, 26 insertions(+), 40 deletions(-) diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index 98e5cb84..f41d79ff 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -1,6 +1,5 @@ #include "StreamReader.hpp" -#include <algorithm> #include <cstddef> #include <cstdint> #include <format> @@ -167,29 +166,27 @@ auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const // Validate the stream's version auto pos = zstd_decompressor->get_pos(); auto const version{get_version(*zstd_decompressor)}; - if (std::ranges::find(cUnstructuredIrVersions, version) != cUnstructuredIrVersions.end()) { - try { + auto const version_validation_result{clp::ffi::ir_stream::validate_protocol_version(version)}; + + try { + if (clp::ffi::ir_stream::IRProtocolErrorCode::Supported == version_validation_result) { + zstd_decompressor->seek_from_begin(0); + return std::make_unique<StructuredIrStreamReader>(StructuredIrStreamReader::create( + std::move(zstd_decompressor), + std::move(data_buffer), + reader_options + )); + } + if (clp::ffi::ir_stream::IRProtocolErrorCode::BackwardCompatible + == version_validation_result) + { zstd_decompressor->seek_from_begin(pos); - } catch (ZstdDecompressor::OperationFailed& e) { - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Failure, - __FILENAME__, - __LINE__, - std::format("Unable to rewind zstd decompressor: {}", e.what()) - }; + return std::make_unique<UnstructuredIrStreamReader>(UnstructuredIrStreamReader::create( + std::move(zstd_decompressor), + std::move(data_buffer) + )); } - return std::make_unique<UnstructuredIrStreamReader>(UnstructuredIrStreamReader::create( - std::move(zstd_decompressor), - std::move(data_buffer) - )); - } - // if (clp::ffi::ir_stream::IRProtocolErrorCode_Supported - // == clp::ffi::ir_stream::validate_protocol_version(version)) - // { - // FIXME: wait for https://github.com/y-scope/clp/pull/573 - try { - zstd_decompressor->seek_from_begin(0); - } catch (ZstdDecompressor::OperationFailed& e) { + } catch (ZstdDecompressor::OperationFailed const& e) { throw ClpFfiJsException{ clp::ErrorCode::ErrorCode_Failure, __FILENAME__, @@ -197,18 +194,12 @@ auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const std::format("Unable to rewind zstd decompressor: {}", e.what()) }; } - return std::make_unique<StructuredIrStreamReader>(StructuredIrStreamReader::create( - std::move(zstd_decompressor), - std::move(data_buffer), - reader_options - )); - // } - - // throw ClpFfiJsException{ - // clp::ErrorCode::ErrorCode_Unsupported, - // __FILENAME__, - // __LINE__, - // std::format("Unable to create reader for IR stream with version {}.", version) - // }; + + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Unsupported, + __FILENAME__, + __LINE__, + std::format("Unable to create reader for IR stream with version {}.", version) + }; } } // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index 111b110f..51b1e23a 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -1,10 +1,8 @@ #ifndef CLP_FFI_JS_IR_STREAMREADER_HPP #define CLP_FFI_JS_IR_STREAMREADER_HPP -#include <array> #include <cstddef> #include <memory> -#include <string_view> #include <clp/streaming_compression/zstd/Decompressor.hpp> #include <emscripten/val.h> @@ -19,9 +17,6 @@ EMSCRIPTEN_DECLARE_VAL_TYPE(ReaderOptions); EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType); -constexpr std::array<std::string_view, 6> cUnstructuredIrVersions - = {"v0.0.2", "v0.0.1", "v0.0.0", "0.0.2", "0.0.1", "0.0.0"}; - /** * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded * log events.