Skip to content

Commit

Permalink
core-clp: Add CLI command to extract a compressed file as IR. (y-scop…
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 authored Jun 12, 2024
1 parent 8f8495b commit d5fcd6b
Show file tree
Hide file tree
Showing 9 changed files with 438 additions and 200 deletions.
99 changes: 98 additions & 1 deletion components/core/src/clp/clp/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,11 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
cerr << "COMMAND is one of:" << endl;
cerr << " c - compress" << endl;
cerr << " x - extract" << endl;
cerr << " i - extract IR" << endl;
cerr << endl;
cerr << "Try " << get_program_name() << " c --help OR " << get_program_name()
<< " x --help for command-specific details." << endl;
<< " x --help OR " << get_program_name()
<< " i --help for command-specific details." << endl;
cerr << endl;

cerr << "Options can be specified on the command line or through a configuration "
Expand All @@ -163,6 +165,7 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
switch (command_input) {
case (char)Command::Compress:
case (char)Command::Extract:
case (char)Command::ExtractIr:
m_command = (Command)command_input;
break;
default:
Expand Down Expand Up @@ -223,6 +226,95 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
if (m_archives_dir.empty()) {
throw invalid_argument("ARCHIVES_DIR cannot be empty.");
}
} else if (Command::ExtractIr == m_command) {
// Define IR extraction hidden positional options
po::options_description ir_positional_options;
// clang-format off
ir_positional_options.add_options()
("archives-dir", po::value<string>(&m_archives_dir))
("output-dir", po::value<string>(&m_output_dir))
("orig-file-id", po::value<string>(&m_orig_file_id));
// clang-format on
po::positional_options_description ir_positional_options_description;
ir_positional_options_description.add("archives-dir", 1);
ir_positional_options_description.add("output-dir", 1);
ir_positional_options_description.add("orig-file-id", 1);

po::options_description options_ir("IR extraction Options");
options_ir.add_options()(
"msg-ix",
po::value<size_t>(&m_ir_msg_ix)
->value_name("INDEX")
->default_value(m_ir_msg_ix),
"Index of log event that decompressed IR chunks must include"
);
options_ir.add_options()(
"target-size",
po::value<size_t>(&m_ir_target_size)
->value_name("SIZE")
->default_value(m_ir_target_size),
"Target size (B) for each IR chunk before a new chunk is created"
);
options_ir.add_options()(
"temp-output-dir",
po::value<string>(&m_ir_temp_output_dir)
->value_name("DIR")
->default_value(m_ir_temp_output_dir),
"Temporary output directory for IR chunks while they're being written"
);

po::options_description all_ir_options;
all_ir_options.add(ir_positional_options);
all_ir_options.add(options_ir);

// Parse IR extraction options
vector<string> unrecognized_options
= po::collect_unrecognized(parsed.options, po::include_positional);
unrecognized_options.erase(unrecognized_options.begin());
po::store(
po::command_line_parser(unrecognized_options)
.options(all_ir_options)
.positional(ir_positional_options_description)
.run(),
parsed_command_line_options
);

notify(parsed_command_line_options);

// Handle --help
if (parsed_command_line_options.count("help")) {
print_ir_basic_usage();

cerr << "Examples:" << endl;
cerr << " # Extract (original) file with ID 8cf8d8f2-bf3f-42a2-90b2-6bc4ed0a36b4"
" as IR"
<< endl;
cerr << " " << get_program_name()
<< " i archives-dir output-dir 8cf8d8f2-bf3f-42a2-90b2-6bc4ed0a36b4" << endl;
cerr << endl;

po::options_description visible_options;
visible_options.add(options_general);
visible_options.add(options_ir);
cerr << visible_options << endl;
return ParsingResult::InfoCommand;
}

if (m_archives_dir.empty()) {
throw invalid_argument("ARCHIVES_DIR cannot be empty.");
}

if (m_output_dir.empty()) {
throw invalid_argument("OUTPUT_DIR cannot be empty.");
}

if (m_orig_file_id.empty()) {
throw invalid_argument("ORIG_FILE_ID cannot be empty.");
}

if (m_ir_temp_output_dir.empty()) {
m_ir_temp_output_dir = m_output_dir;
}
} else if (Command::Compress == m_command) {
// Define compression hidden positional options
po::options_description compression_positional_options;
Expand Down Expand Up @@ -403,4 +495,9 @@ void CommandLineArguments::print_extraction_basic_usage() const {
cerr << "Usage: " << get_program_name() << " [OPTIONS] x ARCHIVES_DIR OUTPUT_DIR [FILE ...]"
<< endl;
}

void CommandLineArguments::print_ir_basic_usage() const {
cerr << "Usage: " << get_program_name() << " [OPTIONS] i ARCHIVES_DIR OUTPUT_DIR ORIG_FILE_ID"
<< endl;
}
} // namespace clp::clp
14 changes: 14 additions & 0 deletions components/core/src/clp/clp/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
enum class Command : char {
Compress = 'c',
Extract = 'x',
ExtractIr = 'i'
};

// Constructors
Expand All @@ -36,6 +37,8 @@ class CommandLineArguments : public CommandLineArgumentsBase {

std::string const& get_path_prefix_to_remove() const { return m_path_prefix_to_remove; }

std::string const& get_ir_temp_output_dir() const { return m_ir_temp_output_dir; }

std::string const& get_output_dir() const { return m_output_dir; }

std::string const& get_schema_file_path() const { return m_schema_file_path; }
Expand Down Expand Up @@ -66,18 +69,29 @@ class CommandLineArguments : public CommandLineArgumentsBase {

std::vector<std::string> const& get_input_paths() const { return m_input_paths; }

std::string const& get_orig_file_id() const { return m_orig_file_id; }

size_t get_ir_msg_ix() const { return m_ir_msg_ix; }

size_t get_ir_target_size() const { return m_ir_target_size; }

GlobalMetadataDBConfig const& get_metadata_db_config() const { return m_metadata_db_config; }

private:
// Methods
void print_basic_usage() const override;
void print_compression_basic_usage() const;
void print_extraction_basic_usage() const;
void print_ir_basic_usage() const;

// Variables
std::string m_path_list_path;
std::string m_path_prefix_to_remove;
std::string m_orig_file_id;
size_t m_ir_msg_ix{0};
size_t m_ir_target_size{128ULL * 1024 * 1024};
bool m_sort_input_files;
std::string m_ir_temp_output_dir;
std::string m_output_dir;
std::string m_schema_file_path;
bool m_show_progress;
Expand Down
169 changes: 0 additions & 169 deletions components/core/src/clp/clp/FileDecompressor.cpp
Original file line number Diff line number Diff line change
@@ -1,59 +1,11 @@
#include "FileDecompressor.hpp"

#include <boost/filesystem.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/filesystem/path.hpp>

#include "../ir/constants.hpp"
#include "../ir/LogEventSerializer.hpp"
#include "../ir/utils.hpp"

using clp::ir::four_byte_encoded_variable_t;
using clp::ir::LogEventSerializer;
using std::string;

namespace clp::clp {
namespace {
/**
* Renames a temporary IR file and moves it to the output directory.
*
* The new name uses the following format:
* <orig_file_id>_<begin_message_ix>_<end_message_ix>.clp.zst
* @param temp_ir
* @param output_directory
* @param orig_file_id
* @param begin_message_ix
* @param end_message_ix
* @return Whether the IR file is successfully renamed.
*/
bool rename_ir_file(
boost::filesystem::path const& temp_ir_path,
boost::filesystem::path const& output_directory,
string const& file_orig_id,
size_t begin_message_ix,
size_t end_message_ix
) {
auto ir_file_name = file_orig_id;
ir_file_name += "_" + std::to_string(begin_message_ix);
ir_file_name += "_" + std::to_string(end_message_ix);
ir_file_name += ir::cIrFileExtension;

auto const renamed_ir_path = output_directory / ir_file_name;
try {
boost::filesystem::rename(temp_ir_path, renamed_ir_path);
} catch (boost::filesystem::filesystem_error const& e) {
SPDLOG_ERROR(
"Failed to rename from {} to {}. Error: {}",
temp_ir_path.c_str(),
renamed_ir_path.c_str(),
e.what()
);
return false;
}
return true;
}
} // namespace

bool FileDecompressor::decompress_file(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
Expand Down Expand Up @@ -122,125 +74,4 @@ bool FileDecompressor::decompress_file(

return true;
}

bool FileDecompressor::decompress_to_ir(
streaming_archive::MetadataDB::FileIterator const& file_metadata_ix,
string const& output_dir,
string const& temp_output_dir,
streaming_archive::reader::Archive& archive_reader,
size_t ir_target_size
) {
// Open encoded file
if (auto const error_code = archive_reader.open_file(m_encoded_file, file_metadata_ix);
ErrorCode_Success != error_code)
{
if (ErrorCode_errno == error_code) {
SPDLOG_ERROR("Failed to open encoded file, errno={}", errno);
} else {
SPDLOG_ERROR("Failed to open encoded file, error_code={}", error_code);
}
return false;
}

// Generate output directory
if (auto const error_code = create_directory_structure(output_dir, 0700);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
output_dir.c_str(),
errno
);
return false;
}

if (temp_output_dir != output_dir) {
// Generate temporary output directory
if (auto const error_code = create_directory_structure(temp_output_dir, 0700);
ErrorCode_Success != error_code)
{
SPDLOG_ERROR(
"Failed to create directory structure {}, errno={}",
temp_output_dir.c_str(),
errno
);
return false;
}
}

boost::filesystem::path temp_ir_path{temp_output_dir};
auto temp_ir_file = m_encoded_file.get_id_as_string();
temp_ir_file += ir::cIrFileExtension;
temp_ir_path /= temp_ir_file;

auto const& file_orig_id = m_encoded_file.get_orig_file_id_as_string();
auto begin_message_ix = m_encoded_file.get_begin_message_ix();

LogEventSerializer<four_byte_encoded_variable_t> ir_serializer;
// Open output IR file
if (false == ir_serializer.open(temp_ir_path.string())) {
SPDLOG_ERROR("Failed to serialize preamble");
return false;
}

while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) {
if (false
== archive_reader
.decompress_message_without_ts(m_encoded_message, m_decompressed_message))
{
SPDLOG_ERROR("Failed to decompress message");
return false;
}

if (ir_serializer.get_serialized_size() >= ir_target_size) {
ir_serializer.close();

auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
if (false
== rename_ir_file(
temp_ir_path,
output_dir,
file_orig_id,
begin_message_ix,
end_message_ix
))
{
return false;
}
begin_message_ix = end_message_ix;

if (false == ir_serializer.open(temp_ir_path.string())) {
SPDLOG_ERROR("Failed to serialize preamble");
return false;
}
}

if (false
== ir_serializer.serialize_log_event(
m_encoded_message.get_ts_in_milli(),
m_decompressed_message
))
{
SPDLOG_ERROR(
"Failed to serialize log event: {} with ts {}",
m_decompressed_message.c_str(),
m_encoded_message.get_ts_in_milli()
);
return false;
}
}
auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events();
ir_serializer.close();

// NOTE: We don't remove temp_output_dir because we don't know if it existed before this method
// was called.
if (false
== rename_ir_file(temp_ir_path, output_dir, file_orig_id, begin_message_ix, end_message_ix))
{
return false;
}

archive_reader.close_file(m_encoded_file);
return true;
}
} // namespace clp::clp
Loading

0 comments on commit d5fcd6b

Please sign in to comment.