Skip to content

Commit

Permalink
Reformat; Add missing files to VCS.
Browse files Browse the repository at this point in the history
  • Loading branch information
junhaoliao committed Nov 7, 2024
1 parent fc08027 commit 8d20583
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 36 deletions.
34 changes: 17 additions & 17 deletions src/clp_ffi_js/ir/StreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/ir/UnstructuredIrStreamReader.hpp>
#include <clp_ffi_js/ir/StructuredIrStreamReader.hpp>
#include <clp_ffi_js/ir/UnstructuredIrStreamReader.hpp>

namespace {
using ClpFfiJsException = clp_ffi_js::ClpFfiJsException;
Expand Down Expand Up @@ -183,9 +183,9 @@ auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const
std::move(data_buffer)
));
}
// if (clp::ffi::ir_stream::IRProtocolErrorCode_Supported
// == clp::ffi::ir_stream::validate_protocol_version(version))
// {
// if (clp::ffi::ir_stream::IRProtocolErrorCode_Supported
// == clp::ffi::ir_stream::validate_protocol_version(version))
// {
// FIXME: wait for https://github.com/y-scope/clp/pull/573
try {
zstd_decompressor->seek_from_begin(0);
Expand All @@ -197,18 +197,18 @@ auto StreamReader::create(DataArrayTsType const& data_array, ReaderOptions const
std::format("Unable to rewind zstd decompressor: {}", e.what())
};
}
return std::make_unique<StructuredIrStreamReader>(StructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer),
reader_options
));
// }

// throw ClpFfiJsException{
// clp::ErrorCode::ErrorCode_Unsupported,
// __FILENAME__,
// __LINE__,
// std::format("Unable to create reader for IR stream with version {}.", version)
// };
return std::make_unique<StructuredIrStreamReader>(StructuredIrStreamReader::create(
std::move(zstd_decompressor),
std::move(data_buffer),
reader_options
));
// }

// throw ClpFfiJsException{
// clp::ErrorCode::ErrorCode_Unsupported,
// __FILENAME__,
// __LINE__,
// std::format("Unable to create reader for IR stream with version {}.", version)
// };
}
} // namespace clp_ffi_js::ir
5 changes: 3 additions & 2 deletions src/clp_ffi_js/ir/StreamReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class StreamReader {
* @return The created instance.
* @throw ClpFfiJsException if any error occurs.
*/
[[nodiscard]] static auto create(DataArrayTsType const& data_array,
ReaderOptions const& reader_options
[[nodiscard]] static auto create(
DataArrayTsType const& data_array,
ReaderOptions const& reader_options
) -> std::unique_ptr<StreamReader>;

// Destructor
Expand Down
1 change: 1 addition & 0 deletions src/clp_ffi_js/ir/StreamReaderDataContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class StreamReaderDataContext {
* @return A reference to the reader.
*/
[[nodiscard]] auto get_reader() -> clp::ReaderInterface& { return *m_reader; }

private:
clp::Array<char> m_data_buffer;
std::unique_ptr<clp::ReaderInterface> m_reader;
Expand Down
195 changes: 195 additions & 0 deletions src/clp_ffi_js/ir/StructuredIrStreamReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#include "StructuredIrStreamReader.hpp"

#include <cstddef>
#include <format>
#include <memory>
#include <string>
#include <string_view>
#include <system_error>
#include <utility>
#include <vector>

#include <clp/Array.hpp>
#include <clp/ErrorCode.hpp>
#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ffi/KeyValuePairLogEvent.hpp>
#include <clp/ffi/Value.hpp>
#include <clp/ir/types.hpp>
#include <clp/TraceableException.hpp>
#include <emscripten/em_asm.h>
#include <emscripten/val.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ClpFfiJsException.hpp>
#include <clp_ffi_js/constants.hpp>
#include <clp_ffi_js/ir/StreamReader.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

namespace clp_ffi_js::ir {

using namespace std::literals::string_literals;
using clp::ir::four_byte_encoded_variable_t;

constexpr std::string_view cLogLevelFilteringNotSupportedPrompt{
"Log level filtering is not yet supported in this reader."
};

auto StructuredIrStreamReader::create(
std::unique_ptr<ZstdDecompressor>&& zstd_decompressor,
clp::Array<char> data_array,
ReaderOptions const& reader_options
) -> StructuredIrStreamReader {
auto deserialized_log_events{std::make_shared<std::vector<clp::ffi::KeyValuePairLogEvent>>()};
auto result{StructuredIrDeserializer::create(
*zstd_decompressor,
IrUnitHandler(
deserialized_log_events,
reader_options["logLevelKey"].as<std::string>(),
reader_options["timestampKey"].as<std::string>()
)
)};
if (result.has_error()) {
auto const error_code{result.error()};
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Failure,
__FILENAME__,
__LINE__,
std::format(
"Failed to create deserializer: {} {}",
error_code.category().name(),
error_code.message()
)
};
}
auto data_context = StreamReaderDataContext<StructuredIrDeserializer>(
std::move(data_array),
std::move(zstd_decompressor),
std::move(result.value())
);
return StructuredIrStreamReader(std::move(data_context), std::move(deserialized_log_events));
}

auto StructuredIrStreamReader::get_num_events_buffered() const -> size_t {
return m_deserialized_log_events->size();
}

auto StructuredIrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType {
SPDLOG_ERROR(cLogLevelFilteringNotSupportedPrompt);
return FilteredLogEventMapTsType{emscripten::val::null()};
}

void StructuredIrStreamReader::filter_log_events(LogLevelFilterTsType const& log_level_filter) {
if (log_level_filter.isNull()) {
return;
}
SPDLOG_ERROR(cLogLevelFilteringNotSupportedPrompt);
}

auto StructuredIrStreamReader::deserialize_stream() -> size_t {
if (nullptr == m_stream_reader_data_context) {
return m_deserialized_log_events->size();
}

constexpr size_t cDefaultNumReservedLogEvents{500'000};
m_deserialized_log_events->reserve(cDefaultNumReservedLogEvents);
auto& reader{m_stream_reader_data_context->get_reader()};
while (true) {
auto result{m_stream_reader_data_context->get_deserializer().deserialize_next_ir_unit(reader
)};
if (false == result.has_error()) {
continue;
}
auto const error{result.error()};
if (std::errc::no_message_available == error || std::errc::operation_not_permitted == error)
{
break;
}
if (std::errc::result_out_of_range == error) {
SPDLOG_ERROR("File contains an incomplete IR stream");
break;
}
throw ClpFfiJsException{
clp::ErrorCode::ErrorCode_Corrupt,
__FILENAME__,
__LINE__,
std::format(
"Failed to deserialize: {}:{}",
error.category().name(),
error.message()
)
};
}
m_level_node_id = m_stream_reader_data_context->get_deserializer()
.get_ir_unit_handler()
.get_level_node_id();
m_timestamp_node_id = m_stream_reader_data_context->get_deserializer()
.get_ir_unit_handler()
.get_timestamp_node_id();
m_stream_reader_data_context.reset(nullptr);
return m_deserialized_log_events->size();
}

auto StructuredIrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const
-> DecodedResultsTsType {
if (use_filter) {
SPDLOG_ERROR(cLogLevelFilteringNotSupportedPrompt);
return DecodedResultsTsType{emscripten::val::null()};
}

if (m_deserialized_log_events->size() < end_idx || begin_idx > end_idx) {
return DecodedResultsTsType{emscripten::val::null()};
}

std::string message;
constexpr size_t cDefaultReservedMessageLength{512};
message.reserve(cDefaultReservedMessageLength);
auto const results{emscripten::val::array()};

for (size_t log_event_idx = begin_idx; log_event_idx < end_idx; ++log_event_idx) {
auto const& log_event{m_deserialized_log_events->at(log_event_idx)};

auto const json{log_event.serialize_to_json()};
if (false == json.has_value()) {
SPDLOG_ERROR("Failed to decode message.");
break;
}

auto const& id_value_pairs{log_event.get_node_id_value_pairs()};
clp::ffi::value_int_t log_level{static_cast<clp::ffi::value_int_t>(LogLevel::NONE)};
if (m_level_node_id.has_value()) {
auto const& log_level_pair{id_value_pairs.at(m_level_node_id.value())};
log_level = log_level_pair.has_value()
? log_level_pair.value().get_immutable_view<clp::ffi::value_int_t>()
: static_cast<clp::ffi::value_int_t>(LogLevel::NONE);
}
clp::ffi::value_int_t timestamp{0};
if (m_timestamp_node_id.has_value()) {
auto const& timestamp_pair{id_value_pairs.at(m_timestamp_node_id.value())};
timestamp = timestamp_pair.has_value()
? timestamp_pair.value().get_immutable_view<clp::ffi::value_int_t>()
: 0;
}

EM_ASM(
{ Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); },
results.as_handle(),
json.value().dump().c_str(),
log_level,
timestamp,
log_event_idx + 1
);
}

return DecodedResultsTsType(results);
}

StructuredIrStreamReader::StructuredIrStreamReader(
StreamReaderDataContext<StructuredIrDeserializer>&& stream_reader_data_context,
std::shared_ptr<std::vector<clp::ffi::KeyValuePairLogEvent>> deserialized_log_events
)
: m_stream_reader_data_context{std::make_unique<
StreamReaderDataContext<StructuredIrDeserializer>>(
std::move(stream_reader_data_context)
)},
m_deserialized_log_events{std::move(deserialized_log_events)} {}
} // namespace clp_ffi_js::ir
29 changes: 12 additions & 17 deletions src/clp_ffi_js/ir/StructuredIrStreamReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@

#include <Array.hpp>
#include <cstddef>
#include <ffi/ir_stream/decoding_methods.hpp>
#include <ffi/KeyValuePairLogEvent.hpp>
#include <ffi/SchemaTree.hpp>
#include <memory>
#include <optional>
#include <string>
#include <time_types.hpp>
#include <utility>
#include <vector>

#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ir/LogEventDeserializer.hpp>
#include <clp/ir/types.hpp>
#include <clp/TimestampPattern.hpp>
#include <emscripten/val.h>
#include <spdlog/spdlog.h>

#include <clp_ffi_js/ir/LogEventWithLevel.hpp>
#include <clp_ffi_js/ir/StreamReader.hpp>
#include <clp_ffi_js/ir/StreamReaderDataContext.hpp>

Expand All @@ -27,18 +29,18 @@ using parsed_tree_node_id_t = std::optional<clp::ffi::SchemaTree::Node::id_t>;
class IrUnitHandler {
public:
IrUnitHandler(
std::vector<clp::ffi::KeyValuePairLogEvent>& deserialized_log_events,
std::shared_ptr<std::vector<clp::ffi::KeyValuePairLogEvent>> deserialized_log_events,
std::string log_level_key,
std::string timestamp_key
)
: m_deserialized_log_events{deserialized_log_events},
: m_deserialized_log_events{std::move(deserialized_log_events)},
m_log_level_key{std::move(log_level_key)},
m_timestamp_key{std::move(timestamp_key)} {}

// Implements `clp::ffi::ir_stream::IrUnitHandlerInterface` interface
[[nodiscard]] auto handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event
) -> clp::ffi::ir_stream::IRErrorCode {
m_deserialized_log_events.emplace_back(std::move(log_event));
m_deserialized_log_events->emplace_back(std::move(log_event));

return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success;
}
Expand All @@ -56,8 +58,8 @@ class IrUnitHandler {
[[maybe_unused]] clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator
) -> clp::ffi::ir_stream::IRErrorCode {
++m_current_node_id;
auto const& key_name{schema_tree_node_locator.get_key_name()};

auto const& key_name{schema_tree_node_locator.get_key_name()};
if (m_log_level_key == key_name) {
m_level_node_id.emplace(m_current_node_id);
} else if (m_timestamp_key == key_name) {
Expand All @@ -67,17 +69,11 @@ class IrUnitHandler {
return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success;
}

// FIXME: do i need this?
[[nodiscard]] static auto handle_end_of_stream() -> clp::ffi::ir_stream::IRErrorCode {
return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success;
}

// Methods
[[nodiscard]] auto get_deserialized_log_events(
) const -> std::vector<clp::ffi::KeyValuePairLogEvent> const& {
return m_deserialized_log_events;
}

[[nodiscard]] auto get_level_node_id() const -> parsed_tree_node_id_t {
return m_level_node_id;
}
Expand All @@ -90,12 +86,11 @@ class IrUnitHandler {
std::string m_log_level_key;
std::string m_timestamp_key;

// the root node has id=0
clp::ffi::SchemaTree::Node::id_t m_current_node_id;
clp::ffi::SchemaTree::Node::id_t m_current_node_id{clp::ffi::SchemaTree::cRootId};
parsed_tree_node_id_t m_level_node_id;
parsed_tree_node_id_t m_timestamp_node_id;

std::vector<clp::ffi::KeyValuePairLogEvent>& m_deserialized_log_events;
std::shared_ptr<std::vector<clp::ffi::KeyValuePairLogEvent>> m_deserialized_log_events;
bool m_is_complete{false};
};

Expand Down

0 comments on commit 8d20583

Please sign in to comment.