Skip to content

Commit

Permalink
core-clp: Add support for decompressing a specific file split from a …
Browse files Browse the repository at this point in the history
…clp archive into one or more IR files. (y-scope#417)
  • Loading branch information
haiqi96 authored Jun 8, 2024
1 parent ee5d0ba commit 040eb51
Show file tree
Hide file tree
Showing 9 changed files with 522 additions and 19 deletions.
3 changes: 3 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,12 @@ set(SOURCE_FILES_unitTest
src/clp/GlobalSQLiteMetadataDB.hpp
src/clp/Grep.cpp
src/clp/Grep.hpp
src/clp/ir/constants.hpp
src/clp/ir/LogEvent.hpp
src/clp/ir/LogEventDeserializer.cpp
src/clp/ir/LogEventDeserializer.hpp
src/clp/ir/LogEventSerializer.cpp
src/clp/ir/LogEventSerializer.hpp
src/clp/ir/parsing.cpp
src/clp/ir/parsing.hpp
src/clp/ir/parsing.inc
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ set(
../GlobalMySQLMetadataDB.hpp
../GlobalSQLiteMetadataDB.cpp
../GlobalSQLiteMetadataDB.hpp
../ir/constants.hpp
../ir/LogEvent.hpp
../ir/LogEventDeserializer.cpp
../ir/LogEventDeserializer.hpp
../ir/LogEventSerializer.cpp
../ir/LogEventSerializer.hpp
../ir/parsing.cpp
../ir/parsing.hpp
../ir/parsing.inc
Expand Down
169 changes: 168 additions & 1 deletion components/core/src/clp/clp/FileDecompressor.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,59 @@
#include "FileDecompressor.hpp"

#include <boost/filesystem.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/filesystem/path.hpp>

#include "../spdlog_with_specializations.hpp"
#include "../ir/constants.hpp"
#include "../ir/LogEventSerializer.hpp"
#include "../ir/utils.hpp"

using clp::ir::four_byte_encoded_variable_t;
using clp::ir::LogEventSerializer;
using std::string;

namespace clp::clp {
namespace {
/**
* Renames a temporary IR file and moves it to the output directory.
*
* The new name uses the following format:
* <orig_file_id>_<begin_message_ix>_<end_message_ix>.clp.zst
* @param temp_ir
* @param output_directory
* @param orig_file_id
* @param begin_message_ix
* @param end_message_ix
* @return Whether the IR file is successfully renamed.
*/
bool rename_ir_file(
boost::filesystem::path const& temp_ir_path,
boost::filesystem::path const& output_directory,
string const& file_orig_id,
size_t begin_message_ix,
size_t end_message_ix
) {
auto ir_file_name = file_orig_id;
ir_file_name += "_" + std::to_string(begin_message_ix);
ir_file_name += "_" + std::to_string(end_message_ix);
ir_file_name += ir::cIrFileExtension;

auto const renamed_ir_path = output_directory / ir_file_name;
try {
boost::filesystem::rename(temp_ir_path, renamed_ir_path);
} catch (boost::filesystem::filesystem_error const& e) {
SPDLOG_ERROR(
"Failed to rename from {} to {}. Error: {}",
temp_ir_path.c_str(),
renamed_ir_path.c_str(),
e.what()
);
return false;
}
return true;
}
} // namespace

bool FileDecompressor::decompress_file(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
Expand Down Expand Up @@ -76,4 +122,125 @@ bool FileDecompressor::decompress_file(

return true;
}

bool FileDecompressor::decompress_to_ir(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
string const& temp_output_dir,
streaming_archive::reader::Archive& archive_reader,
size_t ir_target_size
) {
// Open encoded file
if (auto const error_code = archive_reader.open_file(m_encoded_file, file_metadata_ix);
ErrorCode_Success != error_code)
{
if (ErrorCode_errno == error_code) {
SPDLOG_ERROR("Failed to open encoded file, errno={}", errno);
} else {
SPDLOG_ERROR("Failed to open encoded file, error_code={}", error_code);
}
return false;
}

// Generate output directory
if (auto const error_code = create_directory_structure(output_dir, 0700);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
output_dir.c_str(),
errno
);
return false;
}

if (temp_output_dir != output_dir) {
// Generate temporary output directory
if (auto const error_code = create_directory_structure(temp_output_dir, 0700);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
temp_output_dir.c_str(),
errno
);
return false;
}
}

boost::filesystem::path temp_ir_path{temp_output_dir};
auto temp_ir_file = m_encoded_file.get_id_as_string();
temp_ir_file += ir::cIrFileExtension;
temp_ir_path /= temp_ir_file;

auto const& file_orig_id = m_encoded_file.get_orig_file_id_as_string();
auto begin_message_ix = m_encoded_file.get_begin_message_ix();

LogEventSerializer<four_byte_encoded_variable_t> ir_serializer;
// Open output IR file
if (false == ir_serializer.open(temp_ir_path.string())) {
SPDLOG_ERROR("Failed to serialize preamble");
return false;
}

while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) {
if (false
== archive_reader
.decompress_message_without_ts(m_encoded_message, m_decompressed_message))
{
SPDLOG_ERROR("Failed to decompress message");
return false;
}

if (ir_serializer.get_serialized_size() >= ir_target_size) {
ir_serializer.close();

auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
if (false
== rename_ir_file(
temp_ir_path,
output_dir,
file_orig_id,
begin_message_ix,
end_message_ix
))
{
return false;
}
begin_message_ix = end_message_ix;

if (false == ir_serializer.open(temp_ir_path.string())) {
SPDLOG_ERROR("Failed to serialize preamble");
return false;
}
}

if (false
== ir_serializer.serialize_log_event(
m_encoded_message.get_ts_in_milli(),
m_decompressed_message
))
{
SPDLOG_ERROR(
"Failed to serialize log event: {} with ts {}",
m_decompressed_message.c_str(),
m_encoded_message.get_ts_in_milli()
);
return false;
}
}
auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
ir_serializer.close();

// NOTE: We don't remove temp_output_dir because we don't know if it existed before this method
// was called.
if (false
== rename_ir_file(temp_ir_path, output_dir, file_orig_id, begin_message_ix, end_message_ix))
{
return false;
}

archive_reader.close_file(m_encoded_file);
return true;
}
} // namespace clp::clp
9 changes: 9 additions & 0 deletions components/core/src/clp/clp/FileDecompressor.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_CLP_FILEDECOMPRESSOR_HPP
#define CLP_CLP_FILEDECOMPRESSOR_HPP

#include <cstddef>
#include <string>

#include "../FileWriter.hpp"
Expand All @@ -24,6 +25,14 @@ class FileDecompressor {
std::unordered_map<std::string, std::string>& temp_path_to_final_path
);

bool decompress_to_ir(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
std::string const& output_dir,
std::string const& temp_output_dir,
streaming_archive::reader::Archive& archive_reader,
size_t ir_target_size
);

private:
// Variables
FileWriter m_decompressed_file_writer;
Expand Down
160 changes: 160 additions & 0 deletions components/core/src/clp/ir/LogEventSerializer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#include "LogEventSerializer.hpp"

#include <string>
#include <string_view>

#include <spdlog/spdlog.h>

#include "../Defs.h"
#include "../ErrorCode.hpp"
#include "../ffi/ir_stream/encoding_methods.hpp"
#include "../ffi/ir_stream/protocol_constants.hpp"
#include "../ir/types.hpp"
#include "../type_utils.hpp"

using std::string;
using std::string_view;

namespace clp::ir {
template <typename encoded_variable_t>
LogEventSerializer<encoded_variable_t>::~LogEventSerializer() {
if (m_is_open) {
SPDLOG_ERROR("clp::ir::LogEventSerializer not closed before being destroyed - output maybe "
"corrupted.");
}
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::open(string const& file_path) -> bool {
if (m_is_open) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

m_serialized_size = 0;
m_num_log_events = 0;
m_ir_buf.clear();

m_writer.open(file_path, FileWriter::OpenMode::CREATE_FOR_WRITING);
m_zstd_compressor.open(m_writer);

bool res{};
if constexpr (std::is_same_v<encoded_variable_t, four_byte_encoded_variable_t>) {
m_prev_event_timestamp = 0;
res = ffi::ir_stream::four_byte_encoding::serialize_preamble(
cTimestampPattern,
cTimestampPatternSyntax,
cTimezoneID,
m_prev_event_timestamp,
m_ir_buf
);
} else {
res = clp::ffi::ir_stream::eight_byte_encoding::serialize_preamble(
cTimestampPattern,
cTimestampPatternSyntax,
cTimezoneID,
m_ir_buf
);
}

if (false == res) {
close_writer();
return false;
}

m_is_open = true;

// Flush the preamble
flush();

return true;
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::flush() -> void {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
m_zstd_compressor.write(
size_checked_pointer_cast<char const>(m_ir_buf.data()),
m_ir_buf.size()
);
m_ir_buf.clear();
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::close() -> void {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
m_ir_buf.push_back(clp::ffi::ir_stream::cProtocol::Eof);
flush();
close_writer();
m_is_open = false;
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::serialize_log_event(
epoch_time_ms_t timestamp,
string_view message
) -> bool {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

string logtype;
bool res{};
auto const buf_size_before_serialization = m_ir_buf.size();
if constexpr (std::is_same_v<encoded_variable_t, eight_byte_encoded_variable_t>) {
res = clp::ffi::ir_stream::eight_byte_encoding::serialize_log_event(
timestamp,
message,
logtype,
m_ir_buf
);
} else {
auto const timestamp_delta = timestamp - m_prev_event_timestamp;
m_prev_event_timestamp = timestamp;
res = clp::ffi::ir_stream::four_byte_encoding::serialize_log_event(
timestamp_delta,
message,
logtype,
m_ir_buf
);
}
if (false == res) {
return false;
}
m_serialized_size += m_ir_buf.size() - buf_size_before_serialization;
++m_num_log_events;
return true;
}

template <typename encoded_variable_t>
auto LogEventSerializer<encoded_variable_t>::close_writer() -> void {
m_zstd_compressor.close();
m_writer.close();
}

// Explicitly declare template specializations so that we can define the template methods in this
// file
template LogEventSerializer<eight_byte_encoded_variable_t>::~LogEventSerializer();
template LogEventSerializer<four_byte_encoded_variable_t>::~LogEventSerializer();
template auto LogEventSerializer<eight_byte_encoded_variable_t>::open(string const& file_path
) -> bool;
template auto LogEventSerializer<four_byte_encoded_variable_t>::open(string const& file_path
) -> bool;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::flush() -> void;
template auto LogEventSerializer<four_byte_encoded_variable_t>::flush() -> void;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::close() -> void;
template auto LogEventSerializer<four_byte_encoded_variable_t>::close() -> void;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::serialize_log_event(
epoch_time_ms_t timestamp,
string_view message
) -> bool;
template auto LogEventSerializer<four_byte_encoded_variable_t>::serialize_log_event(
epoch_time_ms_t timestamp,
string_view message
) -> bool;
template auto LogEventSerializer<eight_byte_encoded_variable_t>::close_writer() -> void;
template auto LogEventSerializer<four_byte_encoded_variable_t>::close_writer() -> void;
} // namespace clp::ir
Loading

0 comments on commit 040eb51

Please sign in to comment.