diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 63dc1ab08..c8f76baad 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -335,9 +335,12 @@ set(SOURCE_FILES_unitTest src/clp/GlobalSQLiteMetadataDB.hpp src/clp/Grep.cpp src/clp/Grep.hpp + src/clp/ir/constants.hpp src/clp/ir/LogEvent.hpp src/clp/ir/LogEventDeserializer.cpp src/clp/ir/LogEventDeserializer.hpp + src/clp/ir/LogEventSerializer.cpp + src/clp/ir/LogEventSerializer.hpp src/clp/ir/parsing.cpp src/clp/ir/parsing.hpp src/clp/ir/parsing.inc diff --git a/components/core/src/clp/clp/CMakeLists.txt b/components/core/src/clp/clp/CMakeLists.txt index 9b6563a98..1d8a20860 100644 --- a/components/core/src/clp/clp/CMakeLists.txt +++ b/components/core/src/clp/clp/CMakeLists.txt @@ -36,9 +36,12 @@ set( ../GlobalMySQLMetadataDB.hpp ../GlobalSQLiteMetadataDB.cpp ../GlobalSQLiteMetadataDB.hpp + ../ir/constants.hpp ../ir/LogEvent.hpp ../ir/LogEventDeserializer.cpp ../ir/LogEventDeserializer.hpp + ../ir/LogEventSerializer.cpp + ../ir/LogEventSerializer.hpp ../ir/parsing.cpp ../ir/parsing.hpp ../ir/parsing.inc diff --git a/components/core/src/clp/clp/FileDecompressor.cpp b/components/core/src/clp/clp/FileDecompressor.cpp index 55e53258c..327bfc1ff 100644 --- a/components/core/src/clp/clp/FileDecompressor.cpp +++ b/components/core/src/clp/clp/FileDecompressor.cpp @@ -1,13 +1,59 @@ #include "FileDecompressor.hpp" +#include #include #include -#include "../spdlog_with_specializations.hpp" +#include "../ir/constants.hpp" +#include "../ir/LogEventSerializer.hpp" +#include "../ir/utils.hpp" +using clp::ir::four_byte_encoded_variable_t; +using clp::ir::LogEventSerializer; using std::string; namespace clp::clp { +namespace { +/** + * Renames a temporary IR file and moves it to the output directory. + * + * The new name uses the following format: + * __.clp.zst + * @param temp_ir + * @param output_directory + * @param orig_file_id + * @param begin_message_ix + * @param end_message_ix + * @return Whether the IR file is successfully renamed. + */ +bool rename_ir_file( + boost::filesystem::path const& temp_ir_path, + boost::filesystem::path const& output_directory, + string const& file_orig_id, + size_t begin_message_ix, + size_t end_message_ix +) { + auto ir_file_name = file_orig_id; + ir_file_name += "_" + std::to_string(begin_message_ix); + ir_file_name += "_" + std::to_string(end_message_ix); + ir_file_name += ir::cIrFileExtension; + + auto const renamed_ir_path = output_directory / ir_file_name; + try { + boost::filesystem::rename(temp_ir_path, renamed_ir_path); + } catch (boost::filesystem::filesystem_error const& e) { + SPDLOG_ERROR( + "Failed to rename from {} to {}. Error: {}", + temp_ir_path.c_str(), + renamed_ir_path.c_str(), + e.what() + ); + return false; + } + return true; +} +} // namespace + bool FileDecompressor::decompress_file( streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, string const& output_dir, @@ -76,4 +122,125 @@ bool FileDecompressor::decompress_file( return true; } + +bool FileDecompressor::decompress_to_ir( + streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, + string const& output_dir, + string const& temp_output_dir, + streaming_archive::reader::Archive& archive_reader, + size_t ir_target_size +) { + // Open encoded file + if (auto const error_code = archive_reader.open_file(m_encoded_file, file_metadata_ix); + ErrorCode_Success != error_code) + { + if (ErrorCode_errno == error_code) { + SPDLOG_ERROR("Failed to open encoded file, errno={}", errno); + } else { + SPDLOG_ERROR("Failed to open encoded file, error_code={}", error_code); + } + return false; + } + + // Generate output directory + if (auto const error_code = create_directory_structure(output_dir, 0700); + ErrorCode_Success != error_code) + { + SPDLOG_ERROR( + "Failed to create directory structure {}, errno={}", + output_dir.c_str(), + errno + ); + return false; + } + + if (temp_output_dir != output_dir) { + // Generate temporary output directory + if (auto const error_code = create_directory_structure(temp_output_dir, 0700); + ErrorCode_Success != error_code) + { + SPDLOG_ERROR( + "Failed to create directory structure {}, errno={}", + temp_output_dir.c_str(), + errno + ); + return false; + } + } + + boost::filesystem::path temp_ir_path{temp_output_dir}; + auto temp_ir_file = m_encoded_file.get_id_as_string(); + temp_ir_file += ir::cIrFileExtension; + temp_ir_path /= temp_ir_file; + + auto const& file_orig_id = m_encoded_file.get_orig_file_id_as_string(); + auto begin_message_ix = m_encoded_file.get_begin_message_ix(); + + LogEventSerializer ir_serializer; + // Open output IR file + if (false == ir_serializer.open(temp_ir_path.string())) { + SPDLOG_ERROR("Failed to serialize preamble"); + return false; + } + + while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) { + if (false + == archive_reader + .decompress_message_without_ts(m_encoded_message, m_decompressed_message)) + { + SPDLOG_ERROR("Failed to decompress message"); + return false; + } + + if (ir_serializer.get_serialized_size() >= ir_target_size) { + ir_serializer.close(); + + auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events(); + if (false + == rename_ir_file( + temp_ir_path, + output_dir, + file_orig_id, + begin_message_ix, + end_message_ix + )) + { + return false; + } + begin_message_ix = end_message_ix; + + if (false == ir_serializer.open(temp_ir_path.string())) { + SPDLOG_ERROR("Failed to serialize preamble"); + return false; + } + } + + if (false + == ir_serializer.serialize_log_event( + m_encoded_message.get_ts_in_milli(), + m_decompressed_message + )) + { + SPDLOG_ERROR( + "Failed to serialize log event: {} with ts {}", + m_decompressed_message.c_str(), + m_encoded_message.get_ts_in_milli() + ); + return false; + } + } + auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events(); + ir_serializer.close(); + + // NOTE: We don't remove temp_output_dir because we don't know if it existed before this method + // was called. + if (false + == rename_ir_file(temp_ir_path, output_dir, file_orig_id, begin_message_ix, end_message_ix)) + { + return false; + } + + archive_reader.close_file(m_encoded_file); + return true; +} } // namespace clp::clp diff --git a/components/core/src/clp/clp/FileDecompressor.hpp b/components/core/src/clp/clp/FileDecompressor.hpp index 51598a9f4..a31ea705b 100644 --- a/components/core/src/clp/clp/FileDecompressor.hpp +++ b/components/core/src/clp/clp/FileDecompressor.hpp @@ -1,6 +1,7 @@ #ifndef CLP_CLP_FILEDECOMPRESSOR_HPP #define CLP_CLP_FILEDECOMPRESSOR_HPP +#include #include #include "../FileWriter.hpp" @@ -24,6 +25,14 @@ class FileDecompressor { std::unordered_map& temp_path_to_final_path ); + bool decompress_to_ir( + streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, + std::string const& output_dir, + std::string const& temp_output_dir, + streaming_archive::reader::Archive& archive_reader, + size_t ir_target_size + ); + private: // Variables FileWriter m_decompressed_file_writer; diff --git a/components/core/src/clp/ir/LogEventSerializer.cpp b/components/core/src/clp/ir/LogEventSerializer.cpp new file mode 100644 index 000000000..13c94714c --- /dev/null +++ b/components/core/src/clp/ir/LogEventSerializer.cpp @@ -0,0 +1,160 @@ +#include "LogEventSerializer.hpp" + +#include +#include + +#include + +#include "../Defs.h" +#include "../ErrorCode.hpp" +#include "../ffi/ir_stream/encoding_methods.hpp" +#include "../ffi/ir_stream/protocol_constants.hpp" +#include "../ir/types.hpp" +#include "../type_utils.hpp" + +using std::string; +using std::string_view; + +namespace clp::ir { +template +LogEventSerializer::~LogEventSerializer() { + if (m_is_open) { + SPDLOG_ERROR("clp::ir::LogEventSerializer not closed before being destroyed - output maybe " + "corrupted."); + } +} + +template +auto LogEventSerializer::open(string const& file_path) -> bool { + if (m_is_open) { + throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); + } + + m_serialized_size = 0; + m_num_log_events = 0; + m_ir_buf.clear(); + + m_writer.open(file_path, FileWriter::OpenMode::CREATE_FOR_WRITING); + m_zstd_compressor.open(m_writer); + + bool res{}; + if constexpr (std::is_same_v) { + m_prev_event_timestamp = 0; + res = ffi::ir_stream::four_byte_encoding::serialize_preamble( + cTimestampPattern, + cTimestampPatternSyntax, + cTimezoneID, + m_prev_event_timestamp, + m_ir_buf + ); + } else { + res = clp::ffi::ir_stream::eight_byte_encoding::serialize_preamble( + cTimestampPattern, + cTimestampPatternSyntax, + cTimezoneID, + m_ir_buf + ); + } + + if (false == res) { + close_writer(); + return false; + } + + m_is_open = true; + + // Flush the preamble + flush(); + + return true; +} + +template +auto LogEventSerializer::flush() -> void { + if (false == m_is_open) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + m_zstd_compressor.write( + size_checked_pointer_cast(m_ir_buf.data()), + m_ir_buf.size() + ); + m_ir_buf.clear(); +} + +template +auto LogEventSerializer::close() -> void { + if (false == m_is_open) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + m_ir_buf.push_back(clp::ffi::ir_stream::cProtocol::Eof); + flush(); + close_writer(); + m_is_open = false; +} + +template +auto LogEventSerializer::serialize_log_event( + epoch_time_ms_t timestamp, + string_view message +) -> bool { + if (false == m_is_open) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + + string logtype; + bool res{}; + auto const buf_size_before_serialization = m_ir_buf.size(); + if constexpr (std::is_same_v) { + res = clp::ffi::ir_stream::eight_byte_encoding::serialize_log_event( + timestamp, + message, + logtype, + m_ir_buf + ); + } else { + auto const timestamp_delta = timestamp - m_prev_event_timestamp; + m_prev_event_timestamp = timestamp; + res = clp::ffi::ir_stream::four_byte_encoding::serialize_log_event( + timestamp_delta, + message, + logtype, + m_ir_buf + ); + } + if (false == res) { + return false; + } + m_serialized_size += m_ir_buf.size() - buf_size_before_serialization; + ++m_num_log_events; + return true; +} + +template +auto LogEventSerializer::close_writer() -> void { + m_zstd_compressor.close(); + m_writer.close(); +} + +// Explicitly declare template specializations so that we can define the template methods in this +// file +template LogEventSerializer::~LogEventSerializer(); +template LogEventSerializer::~LogEventSerializer(); +template auto LogEventSerializer::open(string const& file_path +) -> bool; +template auto LogEventSerializer::open(string const& file_path +) -> bool; +template auto LogEventSerializer::flush() -> void; +template auto LogEventSerializer::flush() -> void; +template auto LogEventSerializer::close() -> void; +template auto LogEventSerializer::close() -> void; +template auto LogEventSerializer::serialize_log_event( + epoch_time_ms_t timestamp, + string_view message +) -> bool; +template auto LogEventSerializer::serialize_log_event( + epoch_time_ms_t timestamp, + string_view message +) -> bool; +template auto LogEventSerializer::close_writer() -> void; +template auto LogEventSerializer::close_writer() -> void; +} // namespace clp::ir diff --git a/components/core/src/clp/ir/LogEventSerializer.hpp b/components/core/src/clp/ir/LogEventSerializer.hpp new file mode 100644 index 000000000..635ecca9f --- /dev/null +++ b/components/core/src/clp/ir/LogEventSerializer.hpp @@ -0,0 +1,129 @@ +#ifndef CLP_IR_LOGEVENTSERIALIZER_HPP +#define CLP_IR_LOGEVENTSERIALIZER_HPP + +#include +#include +#include +#include +#include +#include + +#include "../ErrorCode.hpp" +#include "../FileWriter.hpp" +#include "../streaming_compression/zstd/Compressor.hpp" +#include "../TraceableException.hpp" +#include "../type_utils.hpp" +#include "types.hpp" + +namespace clp::ir { +/** + * Class for serializing log events into a Zstandard-compressed IR stream. The serializer first + * buffers the serialized data into an internal buffer, and only flushes the buffered IR to disk + * when `flush` or `close` is called. + */ +template +class LogEventSerializer { +public: + // Types + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + + // Methods + [[nodiscard]] auto what() const noexcept -> char const* override { + return "clp::ir::LogEventSerializer operation failed"; + } + }; + + // Constructors + LogEventSerializer() = default; + + // Delete copy constructor and assignment + LogEventSerializer(LogEventSerializer const&) = delete; + auto operator=(LogEventSerializer const&) -> LogEventSerializer& = delete; + + // Define default move constructor and assignment + LogEventSerializer(LogEventSerializer&&) = default; + auto operator=(LogEventSerializer&&) -> LogEventSerializer& = default; + + ~LogEventSerializer(); + + /** + * Creates a Zstandard-compressed IR file on disk, and writes the IR file's preamble. + * @param file_path + * @return true on success, false if serializing the preamble fails + * @throw FileWriter::OperationFailed if the FileWriter fails to open the file specified by + * file_path + * @throw streaming_compression::zstd::Compressor if the Zstandard compressor couldn't be opened + * @throw ir::LogEventSerializer::OperationFailed if an IR file is already open + */ + [[nodiscard]] auto open(std::string const& file_path) -> bool; + + /** + * Flushes any buffered data. + * @throw ir::LogEventSerializer::OperationFailed if no IR file is open + */ + auto flush() -> void; + + /** + * Serializes the EoF tag, flushes the buffer, and closes the current IR stream. + * @throw ir::LogEventSerializer::OperationFailed if no IR file is open + */ + auto close() -> void; + + /** + * @return Size of serialized data in bytes + */ + [[nodiscard]] auto get_serialized_size() const -> size_t { return m_serialized_size; } + + /** + * @return Number of serialized log events. + */ + [[nodiscard]] auto get_num_log_events() const -> size_t { return m_num_log_events; } + + /** + * Serializes the given log event. + * @return Whether the log event was successfully serialized. + */ + [[nodiscard]] auto + serialize_log_event(epoch_time_ms_t timestamp, std::string_view message) -> bool; + +private: + // Constants + // NOTE: IR files currently store the log's timestamp pattern and timezone ID. However: + // - files in CLP archives don't currently support encoding time zones; + // - IR files don't support multiple timestamp patterns whereas files in CLP archives do; + // - No consumers of IR files currently use these fields. + // Accordingly, we store a default timestamp pattern and timezone ID in the IR file's metadata, + // but it is really up to consumers of the IR file to use an appropriate timestamp pattern and + // timezone when rendering log events. + static constexpr std::string_view cTimestampPattern{"%Y-%m-%d %H:%M:%S,%3"}; + static constexpr std::string_view cTimestampPatternSyntax{}; + static constexpr std::string_view cTimezoneID{"UTC"}; + + // Methods + /** + * Closes the member compressor and file writer in the proper order. + */ + auto close_writer() -> void; + + // Variables + size_t m_num_log_events{0}; + size_t m_serialized_size{0}; // Bytes + + [[no_unique_address]] std::conditional_t< + std::is_same_v, + epoch_time_ms_t, + EmptyType> m_prev_event_timestamp{}; + + std::vector m_ir_buf; + FileWriter m_writer; + streaming_compression::zstd::Compressor m_zstd_compressor; + + bool m_is_open{false}; +}; +} // namespace clp::ir + +#endif // CLP_IR_LOGEVENTSERIALIZER_HPP diff --git a/components/core/src/clp/ir/constants.hpp b/components/core/src/clp/ir/constants.hpp new file mode 100644 index 000000000..2c287c2e0 --- /dev/null +++ b/components/core/src/clp/ir/constants.hpp @@ -0,0 +1,10 @@ +#ifndef CLP_IR_CONSTANTS_HPP +#define CLP_IR_CONSTANTS_HPP + +#include + +namespace clp::ir { +constexpr std::string_view cIrFileExtension{".clp.zst"}; +} // namespace clp::ir + +#endif // CLP_IR_CONSTANTS_HPP diff --git a/components/core/src/clp/streaming_archive/reader/Archive.cpp b/components/core/src/clp/streaming_archive/reader/Archive.cpp index a836a3785..b2770690a 100644 --- a/components/core/src/clp/streaming_archive/reader/Archive.cpp +++ b/components/core/src/clp/streaming_archive/reader/Archive.cpp @@ -168,23 +168,7 @@ bool Archive::decompress_message( Message const& compressed_msg, string& decompressed_msg ) { - decompressed_msg.clear(); - - // Build original message content - logtype_dictionary_id_t const logtype_id = compressed_msg.get_logtype_id(); - auto const& logtype_entry = m_logtype_dictionary.get_entry(logtype_id); - if (!EncodedVariableInterpreter::decode_variables_into_message( - logtype_entry, - m_var_dictionary, - compressed_msg.get_vars(), - decompressed_msg - )) - { - SPDLOG_ERROR( - "streaming_archive::reader::Archive: Failed to decompress variables from " - "logtype id {}", - compressed_msg.get_logtype_id() - ); + if (false == decompress_message_without_ts(compressed_msg, decompressed_msg)) { return false; } @@ -216,6 +200,34 @@ bool Archive::decompress_message( return true; } +bool Archive::decompress_message_without_ts( + Message const& compressed_msg, + string& decompressed_msg +) { + decompressed_msg.clear(); + + // Build original message content + auto const logtype_id = compressed_msg.get_logtype_id(); + auto const& logtype_entry = m_logtype_dictionary.get_entry(logtype_id); + if (false + == EncodedVariableInterpreter::decode_variables_into_message( + logtype_entry, + m_var_dictionary, + compressed_msg.get_vars(), + decompressed_msg + )) + { + SPDLOG_ERROR( + "streaming_archive::reader::Archive: Failed to decompress variables from " + "logtype id {}", + compressed_msg.get_logtype_id() + ); + return false; + } + + return true; +} + void Archive::decompress_empty_directories(string const& output_dir) { boost::filesystem::path output_dir_path = boost::filesystem::path(output_dir); diff --git a/components/core/src/clp/streaming_archive/reader/Archive.hpp b/components/core/src/clp/streaming_archive/reader/Archive.hpp index 14792670e..c724be476 100644 --- a/components/core/src/clp/streaming_archive/reader/Archive.hpp +++ b/components/core/src/clp/streaming_archive/reader/Archive.hpp @@ -89,7 +89,8 @@ class Archive { bool get_next_message(File& file, Message& msg); /** - * Decompresses a given message from a given file + * Decompresses the given message from the given file, including inserting and formatting its + * timestamp if necessary. * @param file * @param compressed_msg * @param decompressed_msg @@ -99,6 +100,15 @@ class Archive { bool decompress_message(File& file, Message const& compressed_msg, std::string& decompressed_msg); + /** + * Decompresses the given message without inserting its timestamp. + * @param compressed_msg + * @param decompressed_msg + * @return Whether the message was successfully decompressed + */ + bool + decompress_message_without_ts(Message const& compressed_msg, std::string& decompressed_msg); + void decompress_empty_directories(std::string const& output_dir); std::unique_ptr get_file_iterator_by_split_id(