diff --git a/components/core/src/clp/clp/CommandLineArguments.cpp b/components/core/src/clp/clp/CommandLineArguments.cpp index 169022524..ccdc99793 100644 --- a/components/core/src/clp/clp/CommandLineArguments.cpp +++ b/components/core/src/clp/clp/CommandLineArguments.cpp @@ -143,9 +143,11 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { cerr << "COMMAND is one of:" << endl; cerr << " c - compress" << endl; cerr << " x - extract" << endl; + cerr << " i - extract IR" << endl; cerr << endl; cerr << "Try " << get_program_name() << " c --help OR " << get_program_name() - << " x --help for command-specific details." << endl; + << " x --help OR " << get_program_name() + << " i --help for command-specific details." << endl; cerr << endl; cerr << "Options can be specified on the command line or through a configuration " @@ -163,6 +165,7 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { switch (command_input) { case (char)Command::Compress: case (char)Command::Extract: + case (char)Command::ExtractIr: m_command = (Command)command_input; break; default: @@ -223,6 +226,95 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { if (m_archives_dir.empty()) { throw invalid_argument("ARCHIVES_DIR cannot be empty."); } + } else if (Command::ExtractIr == m_command) { + // Define IR extraction hidden positional options + po::options_description ir_positional_options; + // clang-format off + ir_positional_options.add_options() + ("archives-dir", po::value(&m_archives_dir)) + ("output-dir", po::value(&m_output_dir)) + ("orig-file-id", po::value(&m_orig_file_id)); + // clang-format on + po::positional_options_description ir_positional_options_description; + ir_positional_options_description.add("archives-dir", 1); + ir_positional_options_description.add("output-dir", 1); + ir_positional_options_description.add("orig-file-id", 1); + + po::options_description options_ir("IR extraction Options"); + options_ir.add_options()( + "msg-ix", + po::value(&m_ir_msg_ix) + ->value_name("INDEX") + ->default_value(m_ir_msg_ix), + "Index of log event that decompressed IR chunks must include" + ); + options_ir.add_options()( + "target-size", + po::value(&m_ir_target_size) + ->value_name("SIZE") + ->default_value(m_ir_target_size), + "Target size (B) for each IR chunk before a new chunk is created" + ); + options_ir.add_options()( + "temp-output-dir", + po::value(&m_ir_temp_output_dir) + ->value_name("DIR") + ->default_value(m_ir_temp_output_dir), + "Temporary output directory for IR chunks while they're being written" + ); + + po::options_description all_ir_options; + all_ir_options.add(ir_positional_options); + all_ir_options.add(options_ir); + + // Parse IR extraction options + vector unrecognized_options + = po::collect_unrecognized(parsed.options, po::include_positional); + unrecognized_options.erase(unrecognized_options.begin()); + po::store( + po::command_line_parser(unrecognized_options) + .options(all_ir_options) + .positional(ir_positional_options_description) + .run(), + parsed_command_line_options + ); + + notify(parsed_command_line_options); + + // Handle --help + if (parsed_command_line_options.count("help")) { + print_ir_basic_usage(); + + cerr << "Examples:" << endl; + cerr << " # Extract (original) file with ID 8cf8d8f2-bf3f-42a2-90b2-6bc4ed0a36b4" + " as IR" + << endl; + cerr << " " << get_program_name() + << " i archives-dir output-dir 8cf8d8f2-bf3f-42a2-90b2-6bc4ed0a36b4" << endl; + cerr << endl; + + po::options_description visible_options; + visible_options.add(options_general); + visible_options.add(options_ir); + cerr << visible_options << endl; + return ParsingResult::InfoCommand; + } + + if (m_archives_dir.empty()) { + throw invalid_argument("ARCHIVES_DIR cannot be empty."); + } + + if (m_output_dir.empty()) { + throw invalid_argument("OUTPUT_DIR cannot be empty."); + } + + if (m_orig_file_id.empty()) { + throw invalid_argument("ORIG_FILE_ID cannot be empty."); + } + + if (m_ir_temp_output_dir.empty()) { + m_ir_temp_output_dir = m_output_dir; + } } else if (Command::Compress == m_command) { // Define compression hidden positional options po::options_description compression_positional_options; @@ -403,4 +495,9 @@ void CommandLineArguments::print_extraction_basic_usage() const { cerr << "Usage: " << get_program_name() << " [OPTIONS] x ARCHIVES_DIR OUTPUT_DIR [FILE ...]" << endl; } + +void CommandLineArguments::print_ir_basic_usage() const { + cerr << "Usage: " << get_program_name() << " [OPTIONS] i ARCHIVES_DIR OUTPUT_DIR ORIG_FILE_ID" + << endl; +} } // namespace clp::clp diff --git a/components/core/src/clp/clp/CommandLineArguments.hpp b/components/core/src/clp/clp/CommandLineArguments.hpp index 2634b51eb..b9cf15740 100644 --- a/components/core/src/clp/clp/CommandLineArguments.hpp +++ b/components/core/src/clp/clp/CommandLineArguments.hpp @@ -16,6 +16,7 @@ class CommandLineArguments : public CommandLineArgumentsBase { enum class Command : char { Compress = 'c', Extract = 'x', + ExtractIr = 'i' }; // Constructors @@ -36,6 +37,8 @@ class CommandLineArguments : public CommandLineArgumentsBase { std::string const& get_path_prefix_to_remove() const { return m_path_prefix_to_remove; } + std::string const& get_ir_temp_output_dir() const { return m_ir_temp_output_dir; } + std::string const& get_output_dir() const { return m_output_dir; } std::string const& get_schema_file_path() const { return m_schema_file_path; } @@ -66,6 +69,12 @@ class CommandLineArguments : public CommandLineArgumentsBase { std::vector const& get_input_paths() const { return m_input_paths; } + std::string const& get_orig_file_id() const { return m_orig_file_id; } + + size_t get_ir_msg_ix() const { return m_ir_msg_ix; } + + size_t get_ir_target_size() const { return m_ir_target_size; } + GlobalMetadataDBConfig const& get_metadata_db_config() const { return m_metadata_db_config; } private: @@ -73,11 +82,16 @@ class CommandLineArguments : public CommandLineArgumentsBase { void print_basic_usage() const override; void print_compression_basic_usage() const; void print_extraction_basic_usage() const; + void print_ir_basic_usage() const; // Variables std::string m_path_list_path; std::string m_path_prefix_to_remove; + std::string m_orig_file_id; + size_t m_ir_msg_ix{0}; + size_t m_ir_target_size{128ULL * 1024 * 1024}; bool m_sort_input_files; + std::string m_ir_temp_output_dir; std::string m_output_dir; std::string m_schema_file_path; bool m_show_progress; diff --git a/components/core/src/clp/clp/FileDecompressor.cpp b/components/core/src/clp/clp/FileDecompressor.cpp index 327bfc1ff..5e023c2e1 100644 --- a/components/core/src/clp/clp/FileDecompressor.cpp +++ b/components/core/src/clp/clp/FileDecompressor.cpp @@ -1,59 +1,11 @@ #include "FileDecompressor.hpp" -#include #include #include -#include "../ir/constants.hpp" -#include "../ir/LogEventSerializer.hpp" -#include "../ir/utils.hpp" - -using clp::ir::four_byte_encoded_variable_t; -using clp::ir::LogEventSerializer; using std::string; namespace clp::clp { -namespace { -/** - * Renames a temporary IR file and moves it to the output directory. - * - * The new name uses the following format: - * __.clp.zst - * @param temp_ir - * @param output_directory - * @param orig_file_id - * @param begin_message_ix - * @param end_message_ix - * @return Whether the IR file is successfully renamed. - */ -bool rename_ir_file( - boost::filesystem::path const& temp_ir_path, - boost::filesystem::path const& output_directory, - string const& file_orig_id, - size_t begin_message_ix, - size_t end_message_ix -) { - auto ir_file_name = file_orig_id; - ir_file_name += "_" + std::to_string(begin_message_ix); - ir_file_name += "_" + std::to_string(end_message_ix); - ir_file_name += ir::cIrFileExtension; - - auto const renamed_ir_path = output_directory / ir_file_name; - try { - boost::filesystem::rename(temp_ir_path, renamed_ir_path); - } catch (boost::filesystem::filesystem_error const& e) { - SPDLOG_ERROR( - "Failed to rename from {} to {}. Error: {}", - temp_ir_path.c_str(), - renamed_ir_path.c_str(), - e.what() - ); - return false; - } - return true; -} -} // namespace - bool FileDecompressor::decompress_file( streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, string const& output_dir, @@ -122,125 +74,4 @@ bool FileDecompressor::decompress_file( return true; } - -bool FileDecompressor::decompress_to_ir( - streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, - string const& output_dir, - string const& temp_output_dir, - streaming_archive::reader::Archive& archive_reader, - size_t ir_target_size -) { - // Open encoded file - if (auto const error_code = archive_reader.open_file(m_encoded_file, file_metadata_ix); - ErrorCode_Success != error_code) - { - if (ErrorCode_errno == error_code) { - SPDLOG_ERROR("Failed to open encoded file, errno={}", errno); - } else { - SPDLOG_ERROR("Failed to open encoded file, error_code={}", error_code); - } - return false; - } - - // Generate output directory - if (auto const error_code = create_directory_structure(output_dir, 0700); - ErrorCode_Success != error_code) - { - SPDLOG_ERROR( - "Failed to create directory structure {}, errno={}", - output_dir.c_str(), - errno - ); - return false; - } - - if (temp_output_dir != output_dir) { - // Generate temporary output directory - if (auto const error_code = create_directory_structure(temp_output_dir, 0700); - ErrorCode_Success != error_code) - { - SPDLOG_ERROR( - "Failed to create directory structure {}, errno={}", - temp_output_dir.c_str(), - errno - ); - return false; - } - } - - boost::filesystem::path temp_ir_path{temp_output_dir}; - auto temp_ir_file = m_encoded_file.get_id_as_string(); - temp_ir_file += ir::cIrFileExtension; - temp_ir_path /= temp_ir_file; - - auto const& file_orig_id = m_encoded_file.get_orig_file_id_as_string(); - auto begin_message_ix = m_encoded_file.get_begin_message_ix(); - - LogEventSerializer ir_serializer; - // Open output IR file - if (false == ir_serializer.open(temp_ir_path.string())) { - SPDLOG_ERROR("Failed to serialize preamble"); - return false; - } - - while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) { - if (false - == archive_reader - .decompress_message_without_ts(m_encoded_message, m_decompressed_message)) - { - SPDLOG_ERROR("Failed to decompress message"); - return false; - } - - if (ir_serializer.get_serialized_size() >= ir_target_size) { - ir_serializer.close(); - - auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events(); - if (false - == rename_ir_file( - temp_ir_path, - output_dir, - file_orig_id, - begin_message_ix, - end_message_ix - )) - { - return false; - } - begin_message_ix = end_message_ix; - - if (false == ir_serializer.open(temp_ir_path.string())) { - SPDLOG_ERROR("Failed to serialize preamble"); - return false; - } - } - - if (false - == ir_serializer.serialize_log_event( - m_encoded_message.get_ts_in_milli(), - m_decompressed_message - )) - { - SPDLOG_ERROR( - "Failed to serialize log event: {} with ts {}", - m_decompressed_message.c_str(), - m_encoded_message.get_ts_in_milli() - ); - return false; - } - } - auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events(); - ir_serializer.close(); - - // NOTE: We don't remove temp_output_dir because we don't know if it existed before this method - // was called. - if (false - == rename_ir_file(temp_ir_path, output_dir, file_orig_id, begin_message_ix, end_message_ix)) - { - return false; - } - - archive_reader.close_file(m_encoded_file); - return true; -} } // namespace clp::clp diff --git a/components/core/src/clp/clp/FileDecompressor.hpp b/components/core/src/clp/clp/FileDecompressor.hpp index a31ea705b..6bcf86829 100644 --- a/components/core/src/clp/clp/FileDecompressor.hpp +++ b/components/core/src/clp/clp/FileDecompressor.hpp @@ -1,14 +1,23 @@ #ifndef CLP_CLP_FILEDECOMPRESSOR_HPP #define CLP_CLP_FILEDECOMPRESSOR_HPP +#include #include #include +#include + #include "../FileWriter.hpp" +#include "../ir/constants.hpp" +#include "../ir/LogEventSerializer.hpp" +#include "../spdlog_with_specializations.hpp" #include "../streaming_archive/MetadataDB.hpp" #include "../streaming_archive/reader/Archive.hpp" #include "../streaming_archive/reader/File.hpp" #include "../streaming_archive/reader/Message.hpp" +#include "ErrorCode.hpp" +#include "ir/types.hpp" +#include "Utils.hpp" namespace clp::clp { /** @@ -25,13 +34,29 @@ class FileDecompressor { std::unordered_map& temp_path_to_final_path ); - bool decompress_to_ir( + /** + * Decompresses the given file split into one or more IR files (chunks). The function creates a + * new IR chunk when the current IR chunk exceeds ir_target_size. + * + * @tparam IrOutputHandler Function to handle the resulting IR chunks. + * Signature: (boost::filesystem::path const& ir_file_path, string const& orig_file_id, + * size_t begin_message_ix, size_t end_message_ix) -> bool; + * The function returns whether it succeeded. + * @param archive_reader + * @param file_metadata_ix + * @param ir_target_size Target size of each IR chunk. NOTE: This is not a hard limit. + * @param output_dir Directory to write IR chunks to + * @param ir_output_handler + * @return Whether decompression was successful. + */ + template + auto decompress_to_ir( + streaming_archive::reader::Archive& archive_reader, streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, + size_t ir_target_size, std::string const& output_dir, - std::string const& temp_output_dir, - streaming_archive::reader::Archive& archive_reader, - size_t ir_target_size - ); + IrOutputHandler ir_output_handler + ) -> bool; private: // Variables @@ -40,6 +65,111 @@ class FileDecompressor { streaming_archive::reader::Message m_encoded_message; std::string m_decompressed_message; }; + +// Templated methods +template +auto FileDecompressor::decompress_to_ir( + streaming_archive::reader::Archive& archive_reader, + streaming_archive::MetadataDB::FileIterator const& file_metadata_ix, + size_t ir_target_size, + std::string const& output_dir, + IrOutputHandler ir_output_handler +) -> bool { + // Open encoded file + if (auto const error_code = archive_reader.open_file(m_encoded_file, file_metadata_ix); + ErrorCode_Success != error_code) + { + if (ErrorCode_errno == error_code) { + SPDLOG_ERROR("Failed to open encoded file, errno={}", errno); + } else { + SPDLOG_ERROR("Failed to open encoded file, error_code={}", error_code); + } + return false; + } + + // Generate output directory + if (auto const error_code = create_directory_structure(output_dir, 0700); + ErrorCode_Success != error_code) + { + SPDLOG_ERROR( + "Failed to create directory structure {}, errno={}", + output_dir.c_str(), + errno + ); + return false; + } + + boost::filesystem::path ir_output_path{output_dir}; + auto ir_file_name = m_encoded_file.get_id_as_string(); + ir_file_name += ir::cIrFileExtension; + ir_output_path /= ir_file_name; + + auto const& file_orig_id = m_encoded_file.get_orig_file_id_as_string(); + auto begin_message_ix = m_encoded_file.get_begin_message_ix(); + + ir::LogEventSerializer ir_serializer; + // Open output IR file + if (false == ir_serializer.open(ir_output_path.string())) { + SPDLOG_ERROR("Failed to serialize preamble"); + return false; + } + + while (archive_reader.get_next_message(m_encoded_file, m_encoded_message)) { + if (false + == archive_reader + .decompress_message_without_ts(m_encoded_message, m_decompressed_message)) + { + SPDLOG_ERROR("Failed to decompress message"); + return false; + } + + if (ir_serializer.get_serialized_size() >= ir_target_size) { + ir_serializer.close(); + + auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events(); + if (false + == ir_output_handler( + ir_output_path, + file_orig_id, + begin_message_ix, + end_message_ix + )) + { + return false; + } + begin_message_ix = end_message_ix; + + if (false == ir_serializer.open(ir_output_path.string())) { + SPDLOG_ERROR("Failed to serialize preamble"); + return false; + } + } + + if (false + == ir_serializer.serialize_log_event( + m_encoded_message.get_ts_in_milli(), + m_decompressed_message + )) + { + SPDLOG_ERROR( + "Failed to serialize log event: {} with ts {}", + m_decompressed_message.c_str(), + m_encoded_message.get_ts_in_milli() + ); + return false; + } + } + auto const end_message_ix = begin_message_ix + ir_serializer.get_num_log_events(); + ir_serializer.close(); + + if (false == ir_output_handler(ir_output_path, file_orig_id, begin_message_ix, end_message_ix)) + { + return false; + } + + archive_reader.close_file(m_encoded_file); + return true; +} }; // namespace clp::clp #endif // CLP_CLP_FILEDECOMPRESSOR_HPP diff --git a/components/core/src/clp/clp/decompression.cpp b/components/core/src/clp/clp/decompression.cpp index c3340e570..5eb8fc898 100644 --- a/components/core/src/clp/clp/decompression.cpp +++ b/components/core/src/clp/clp/decompression.cpp @@ -14,6 +14,8 @@ #include "../TraceableException.hpp" #include "../Utils.hpp" #include "FileDecompressor.hpp" +#include "ir/constants.hpp" +#include "utils.hpp" using std::cerr; using std::make_unique; @@ -41,27 +43,7 @@ bool decompress( try { auto archives_dir = boost::filesystem::path(command_line_args.get_archives_dir()); auto const& global_metadata_db_config = command_line_args.get_metadata_db_config(); - std::unique_ptr global_metadata_db; - switch (global_metadata_db_config.get_metadata_db_type()) { - case GlobalMetadataDBConfig::MetadataDBType::SQLite: { - auto global_metadata_db_path - = archives_dir / streaming_archive::cMetadataDBFileName; - global_metadata_db - = std::make_unique(global_metadata_db_path.string() - ); - break; - } - case GlobalMetadataDBConfig::MetadataDBType::MySQL: - global_metadata_db = std::make_unique( - global_metadata_db_config.get_metadata_db_host(), - global_metadata_db_config.get_metadata_db_port(), - global_metadata_db_config.get_metadata_db_username(), - global_metadata_db_config.get_metadata_db_password(), - global_metadata_db_config.get_metadata_db_name(), - global_metadata_db_config.get_metadata_table_prefix() - ); - break; - } + auto global_metadata_db = get_global_metadata_db(global_metadata_db_config, archives_dir); streaming_archive::reader::Archive archive_reader; @@ -251,4 +233,117 @@ bool decompress( return true; } + +bool decompress_to_ir(CommandLineArguments& command_line_args) { + ErrorCode error_code{}; + + // Create output directory in case it doesn't exist + auto output_dir = boost::filesystem::path(command_line_args.get_output_dir()); + error_code = create_directory(output_dir.parent_path().string(), 0700, true); + if (ErrorCode_Success != error_code) { + SPDLOG_ERROR("Failed to create {} - {}", output_dir.parent_path().c_str(), strerror(errno)); + return false; + } + + try { + auto archives_dir = boost::filesystem::path(command_line_args.get_archives_dir()); + auto const& global_metadata_db_config = command_line_args.get_metadata_db_config(); + auto global_metadata_db = get_global_metadata_db(global_metadata_db_config, archives_dir); + + global_metadata_db->open(); + string archive_id; + string file_split_id; + if (false + == global_metadata_db->get_file_split( + command_line_args.get_orig_file_id(), + command_line_args.get_ir_msg_ix(), + archive_id, + file_split_id + )) + { + SPDLOG_ERROR( + "Failed to find file split containing msg_ix {}", + command_line_args.get_ir_msg_ix() + ); + return false; + } + global_metadata_db->close(); + + streaming_archive::reader::Archive archive_reader; + auto archive_path = archives_dir / archive_id; + archive_reader.open(archive_path.string()); + archive_reader.refresh_dictionaries(); + + auto file_metadata_ix_ptr = archive_reader.get_file_iterator_by_split_id(file_split_id); + if (false == file_metadata_ix_ptr->has_next()) { + SPDLOG_ERROR("File split doesn't exist {} in archive {}", file_split_id, archive_id); + return false; + } + + auto ir_output_handler = [&](boost::filesystem::path const& src_ir_path, + string const& orig_file_id, + size_t begin_message_ix, + size_t end_message_ix) { + auto dest_ir_file_name = orig_file_id; + dest_ir_file_name += "_" + std::to_string(begin_message_ix); + dest_ir_file_name += "_" + std::to_string(end_message_ix); + dest_ir_file_name += ir::cIrFileExtension; + + auto const dest_ir_path = output_dir / dest_ir_file_name; + try { + boost::filesystem::rename(src_ir_path, dest_ir_path); + } catch (boost::filesystem::filesystem_error const& e) { + SPDLOG_ERROR( + "Failed to rename from {} to {}. Error: {}", + src_ir_path.c_str(), + dest_ir_path.c_str(), + e.what() + ); + return false; + } + return true; + }; + + // Decompress file split + FileDecompressor file_decompressor; + if (false + == file_decompressor.decompress_to_ir( + archive_reader, + *file_metadata_ix_ptr, + command_line_args.get_ir_target_size(), + command_line_args.get_ir_temp_output_dir(), + ir_output_handler + )) + { + return false; + } + + file_metadata_ix_ptr.reset(nullptr); + + archive_reader.close(); + } catch (TraceableException& e) { + error_code = e.get_error_code(); + if (ErrorCode_errno == error_code) { + SPDLOG_ERROR( + "Decompression failed: {}:{} {}, errno={}", + e.get_filename(), + e.get_line_number(), + e.what(), + errno + ); + return false; + } else { + SPDLOG_ERROR( + "Decompression failed: {}:{} {}, error_code={}", + e.get_filename(), + e.get_line_number(), + e.what(), + error_code + ); + return false; + } + } + + return true; +} } // namespace clp::clp diff --git a/components/core/src/clp/clp/decompression.hpp b/components/core/src/clp/clp/decompression.hpp index 60c5270ec..c5043ae2e 100644 --- a/components/core/src/clp/clp/decompression.hpp +++ b/components/core/src/clp/clp/decompression.hpp @@ -17,6 +17,12 @@ bool decompress( CommandLineArguments& command_line_args, std::unordered_set const& files_to_decompress ); +/** + * Decompresses a file split from an archive into one or more IR chunks in the the given directory. + * @param command_line_args + * @return Whether decompression was successful. + */ +bool decompress_to_ir(CommandLineArguments& command_line_args); } // namespace clp::clp #endif // CLP_CLP_DECOMPRESSION_HPP diff --git a/components/core/src/clp/clp/run.cpp b/components/core/src/clp/clp/run.cpp index 1eb9e2f8a..5a3b0eb27 100644 --- a/components/core/src/clp/clp/run.cpp +++ b/components/core/src/clp/clp/run.cpp @@ -54,7 +54,8 @@ int run(int argc, char const* argv[]) { } } - if (CommandLineArguments::Command::Compress == command_line_args.get_command()) { + auto command = command_line_args.get_command(); + if (CommandLineArguments::Command::Compress == command) { /// TODO: make this not a unique_ptr and test performance difference std::unique_ptr reader_parser; if (!command_line_args.get_use_heuristic()) { @@ -134,11 +135,18 @@ int run(int argc, char const* argv[]) { if (!compression_successful) { return -1; } - } else { // CommandLineArguments::Command::Extract == command + } else if (CommandLineArguments::Command::Extract == command) { unordered_set files_to_decompress(input_paths.cbegin(), input_paths.cend()); - if (!decompress(command_line_args, files_to_decompress)) { + if (false == decompress(command_line_args, files_to_decompress)) { return -1; } + } else if (CommandLineArguments::Command::ExtractIr == command) { + if (false == decompress_to_ir(command_line_args)) { + return -1; + } + } else { + SPDLOG_ERROR("Command {} not implemented.", enum_to_underlying_type(command)); + return -1; } Profiler::stop_continuous_measurement(); diff --git a/components/core/src/clp/clp/utils.cpp b/components/core/src/clp/clp/utils.cpp index 715eee858..77ee4c344 100644 --- a/components/core/src/clp/clp/utils.cpp +++ b/components/core/src/clp/clp/utils.cpp @@ -1,12 +1,16 @@ #include "utils.hpp" -#include +#include #include #include "../ErrorCode.hpp" +#include "../GlobalMySQLMetadataDB.hpp" +#include "../GlobalSQLiteMetadataDB.hpp" #include "../spdlog_with_specializations.hpp" #include "../Utils.hpp" +#include "streaming_archive/Constants.hpp" +#include "TraceableException.hpp" using std::string; using std::vector; @@ -200,4 +204,29 @@ bool validate_paths_exist(vector const& paths) { return all_paths_exist; } + +std::unique_ptr get_global_metadata_db( + GlobalMetadataDBConfig const& global_metadata_db_config, + boost::filesystem::path const& archives_dir +) { + switch (global_metadata_db_config.get_metadata_db_type()) { + case GlobalMetadataDBConfig::MetadataDBType::SQLite: { + auto global_metadata_db_path + = archives_dir + / static_cast(streaming_archive::cMetadataDBFileName); + return std::make_unique(global_metadata_db_path.string()); + } + case GlobalMetadataDBConfig::MetadataDBType::MySQL: + return std::make_unique( + global_metadata_db_config.get_metadata_db_host(), + global_metadata_db_config.get_metadata_db_port(), + global_metadata_db_config.get_metadata_db_username(), + global_metadata_db_config.get_metadata_db_password(), + global_metadata_db_config.get_metadata_db_name(), + global_metadata_db_config.get_metadata_table_prefix() + ); + default: + throw ClpOperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__); + } +} } // namespace clp::clp diff --git a/components/core/src/clp/clp/utils.hpp b/components/core/src/clp/clp/utils.hpp index a53277572..4f58e1ee4 100644 --- a/components/core/src/clp/clp/utils.hpp +++ b/components/core/src/clp/clp/utils.hpp @@ -1,13 +1,30 @@ #ifndef CLP_CLP_UTILS_HPP #define CLP_CLP_UTILS_HPP +#include #include #include +#include "../GlobalMetadataDB.hpp" +#include "../GlobalMetadataDBConfig.hpp" +#include "ErrorCode.hpp" #include "FileToCompress.hpp" +#include "TraceableException.hpp" namespace clp::clp { +// Types +class ClpOperationFailed : public TraceableException { +public: + // Constructors + ClpOperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + + // Methods + [[nodiscard]] char const* what() const noexcept override { return "CLP operation failed"; } +}; + +// Methods /** * Recursively finds all files and empty directories at the given path * @param path_prefix_to_remove @@ -61,6 +78,17 @@ bool remove_prefix_and_clean_up_path( * @return true if they all exist, false otherwise */ bool validate_paths_exist(std::vector const& paths); + +/** + * Chooses and initializes the relevant global metadata DB class based on the given config. + * @param global_metadata_db_config + * @param archives_dir + * @return The relevant global metadata DB class. + */ +std::unique_ptr get_global_metadata_db( + GlobalMetadataDBConfig const& global_metadata_db_config, + boost::filesystem::path const& archives_dir +); } // namespace clp::clp #endif // CLP_CLP_UTILS_HPP