Skip to content

Commit

Permalink
feat(clp-s): Add the write path for single-file archives. (#563)
Browse files Browse the repository at this point in the history
Co-authored-by: Devin Gibson <[email protected]>
  • Loading branch information
wraymo and gibber9809 authored Nov 27, 2024
1 parent b8e22da commit f1876cd
Show file tree
Hide file tree
Showing 14 changed files with 359 additions and 115 deletions.
192 changes: 169 additions & 23 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "ArchiveWriter.hpp"

#include <algorithm>
#include <filesystem>
#include <sstream>

#include <json/single_include/nlohmann/json.hpp>

Expand All @@ -13,18 +15,23 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) {
m_id = boost::uuids::to_string(option.id);
m_compression_level = option.compression_level;
m_print_archive_stats = option.print_archive_stats;
m_single_file_archive = option.single_file_archive;
m_min_table_size = option.min_table_size;
auto archive_path = boost::filesystem::path(option.archives_dir) / m_id;
m_archives_dir = option.archives_dir;
std::string working_dir_name = m_id;
if (option.single_file_archive) {
working_dir_name += constants::cTmpPostfix;
}
auto archive_path = std::filesystem::path(option.archives_dir) / working_dir_name;

boost::system::error_code boost_error_code;
bool path_exists = boost::filesystem::exists(archive_path, boost_error_code);
if (path_exists) {
std::error_code ec;
if (std::filesystem::exists(archive_path, ec)) {
SPDLOG_ERROR("Archive path already exists: {}", archive_path.c_str());
throw OperationFailed(ErrorCodeUnsupported, __FILENAME__, __LINE__);
}

m_archive_path = archive_path.string();
if (false == boost::filesystem::create_directory(m_archive_path)) {
if (false == std::filesystem::create_directory(m_archive_path, ec)) {
throw OperationFailed(ErrorCodeErrno, __FILENAME__, __LINE__);
}

Expand All @@ -39,20 +46,42 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) {
std::string array_dict_path = m_archive_path + constants::cArchiveArrayDictFile;
m_array_dict = std::make_shared<LogTypeDictionaryWriter>();
m_array_dict->open(array_dict_path, m_compression_level, UINT64_MAX);

std::string timestamp_dict_path = m_archive_path + constants::cArchiveTimestampDictFile;
m_timestamp_dict = std::make_shared<TimestampDictionaryWriter>();
m_timestamp_dict->open(timestamp_dict_path, m_compression_level);
}

void ArchiveWriter::close() {
m_compressed_size += m_var_dict->close();
m_compressed_size += m_log_dict->close();
m_compressed_size += m_array_dict->close();
m_compressed_size += m_timestamp_dict->close();
m_compressed_size += m_schema_tree.store(m_archive_path, m_compression_level);
m_compressed_size += m_schema_map.store(m_archive_path, m_compression_level);
m_compressed_size += store_tables();
auto var_dict_compressed_size = m_var_dict->close();
auto log_dict_compressed_size = m_log_dict->close();
auto array_dict_compressed_size = m_array_dict->close();
auto schema_tree_compressed_size = m_schema_tree.store(m_archive_path, m_compression_level);
auto schema_map_compressed_size = m_schema_map.store(m_archive_path, m_compression_level);
auto [table_metadata_compressed_size, table_compressed_size] = store_tables();

if (m_single_file_archive) {
std::vector<ArchiveFileInfo> files{
{constants::cArchiveSchemaTreeFile, schema_tree_compressed_size},
{constants::cArchiveSchemaMapFile, schema_map_compressed_size},
{constants::cArchiveTableMetadataFile, table_metadata_compressed_size},
{constants::cArchiveVarDictFile, var_dict_compressed_size},
{constants::cArchiveLogDictFile, log_dict_compressed_size},
{constants::cArchiveArrayDictFile, array_dict_compressed_size},
{constants::cArchiveTablesFile, table_compressed_size}
};
uint64_t offset = 0;
for (auto& file : files) {
uint64_t original_size = file.o;
file.o = offset;
offset += original_size;
}
write_single_file_archive(files);
} else {
// Timestamp dictionary written separately here until we transition to moving it inside of
// the metadata region of multi-file archives.
auto timestamp_dict_compressed_size = write_timestamp_dict();
m_compressed_size = var_dict_compressed_size + log_dict_compressed_size
+ array_dict_compressed_size + timestamp_dict_compressed_size
+ schema_tree_compressed_size + schema_map_compressed_size
+ table_metadata_compressed_size + table_compressed_size;
}

if (m_metadata_db) {
update_metadata_db();
Expand All @@ -65,12 +94,130 @@ void ArchiveWriter::close() {
m_id_to_schema_writer.clear();
m_schema_tree.clear();
m_schema_map.clear();
m_timestamp_dict.clear();
m_encoded_message_size = 0UL;
m_uncompressed_size = 0UL;
m_compressed_size = 0UL;
m_next_log_event_id = 0;
}

size_t ArchiveWriter::write_timestamp_dict() {
std::string timestamp_dict_path = m_archive_path + constants::cArchiveTimestampDictFile;
FileWriter timestamp_dict_file_writer;
ZstdCompressor timestamp_dict_compressor;
timestamp_dict_file_writer.open(timestamp_dict_path, FileWriter::OpenMode::CreateForWriting);
timestamp_dict_compressor.open(timestamp_dict_file_writer, m_compression_level);
std::stringstream timestamp_dict_stream;
m_timestamp_dict.write(timestamp_dict_stream);
std::string encoded_timestamp_dict = timestamp_dict_stream.str();
timestamp_dict_compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());
timestamp_dict_compressor.close();
auto compressed_size = timestamp_dict_file_writer.get_pos();
timestamp_dict_file_writer.close();
return compressed_size;
}

void ArchiveWriter::write_single_file_archive(std::vector<ArchiveFileInfo> const& files) {
std::string single_file_archive_path = (std::filesystem::path(m_archives_dir) / m_id).string();
FileWriter archive_writer;
archive_writer.open(single_file_archive_path, FileWriter::OpenMode::CreateForWriting);

write_archive_metadata(archive_writer, files);
size_t metadata_section_size = archive_writer.get_pos() - sizeof(ArchiveHeader);
write_archive_files(archive_writer, files);
m_compressed_size = archive_writer.get_pos();
write_archive_header(archive_writer, metadata_section_size);

archive_writer.close();
std::error_code ec;
if (false == std::filesystem::remove(m_archive_path, ec)) {
throw OperationFailed(ErrorCodeFileExists, __FILENAME__, __LINE__);
}
}

void ArchiveWriter::write_archive_metadata(
FileWriter& archive_writer,
std::vector<ArchiveFileInfo> const& files
) {
archive_writer.seek_from_begin(sizeof(ArchiveHeader));

ZstdCompressor compressor;
compressor.open(archive_writer, m_compression_level);
compressor.write_numeric_value(static_cast<uint8_t>(3U)); // Number of packets

// Write archive info
ArchiveInfoPacket archive_info{.num_segments = 1};
std::stringstream msgpack_buffer;
msgpack::pack(msgpack_buffer, archive_info);
std::string archive_info_str = msgpack_buffer.str();
compressor.write_numeric_value(ArchiveMetadataPacketType::ArchiveInfo);
compressor.write_numeric_value(static_cast<uint32_t>(archive_info_str.size()));
compressor.write_string(archive_info_str);

// Write archive file info
ArchiveFileInfoPacket archive_file_info{.files{files}};
msgpack_buffer = std::stringstream{};
msgpack::pack(msgpack_buffer, archive_file_info);
std::string archive_file_info_str = msgpack_buffer.str();
compressor.write_numeric_value(ArchiveMetadataPacketType::ArchiveFileInfo);
compressor.write_numeric_value(static_cast<uint32_t>(archive_file_info_str.size()));
compressor.write_string(archive_file_info_str);

// Write timestamp dictionary
compressor.write_numeric_value(ArchiveMetadataPacketType::TimestampDictionary);
std::stringstream timestamp_dict_stream;
m_timestamp_dict.write(timestamp_dict_stream);
std::string encoded_timestamp_dict = timestamp_dict_stream.str();
compressor.write_numeric_value(static_cast<uint32_t>(encoded_timestamp_dict.size()));
compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());

compressor.close();
}

void ArchiveWriter::write_archive_files(
FileWriter& archive_writer,
std::vector<ArchiveFileInfo> const& files
) {
FileReader reader;
for (auto const& file : files) {
std::string file_path = m_archive_path + file.n;
reader.open(file_path);
char read_buffer[cReadBlockSize];
while (true) {
size_t num_bytes_read{0};
ErrorCode const error_code
= reader.try_read(read_buffer, cReadBlockSize, num_bytes_read);
if (ErrorCodeEndOfFile == error_code) {
break;
} else if (ErrorCodeSuccess != error_code) {
throw OperationFailed(error_code, __FILENAME__, __LINE__);
}
archive_writer.write(read_buffer, num_bytes_read);
}
reader.close();
if (false == std::filesystem::remove(file_path)) {
throw OperationFailed(ErrorCodeFileExists, __FILENAME__, __LINE__);
}
}
}

void ArchiveWriter::write_archive_header(FileWriter& archive_writer, size_t metadata_section_size) {
ArchiveHeader header{
.magic_number{0},
.version
= (cArchiveMajorVersion << 24) | (cArchiveMinorVersion << 16) | cArchivePatchVersion,
.uncompressed_size = m_uncompressed_size,
.compressed_size = m_compressed_size,
.reserved_padding{0},
.metadata_section_size = static_cast<uint32_t>(metadata_section_size),
.compression_type = static_cast<uint16_t>(ArchiveCompressionType::Zstd),
.padding = 0
};
std::memcpy(&header.magic_number, cStructuredSFAMagicNumber, sizeof(header.magic_number));
archive_writer.seek_from_begin(0);
archive_writer.write(reinterpret_cast<char const*>(&header), sizeof(header));
}

void ArchiveWriter::append_message(
int32_t schema_id,
Schema const& schema,
Expand Down Expand Up @@ -132,8 +279,7 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
}
}

size_t ArchiveWriter::store_tables() {
size_t compressed_size = 0;
std::pair<size_t, size_t> ArchiveWriter::store_tables() {
m_tables_file_writer.open(
m_archive_path + constants::cArchiveTablesFile,
FileWriter::OpenMode::CreateForWriting
Expand Down Expand Up @@ -243,13 +389,13 @@ size_t ArchiveWriter::store_tables() {
}
m_table_metadata_compressor.close();

compressed_size += m_table_metadata_file_writer.get_pos();
compressed_size += m_tables_file_writer.get_pos();
auto table_metadata_compressed_size = m_table_metadata_file_writer.get_pos();
auto table_compressed_size = m_tables_file_writer.get_pos();

m_table_metadata_file_writer.close();
m_tables_file_writer.close();

return compressed_size;
return {table_metadata_compressed_size, table_compressed_size};
}

void ArchiveWriter::update_metadata_db() {
Expand All @@ -262,8 +408,8 @@ void ArchiveWriter::update_metadata_db() {
metadata.increment_static_compressed_size(m_compressed_size);
metadata.increment_static_uncompressed_size(m_uncompressed_size);
metadata.expand_time_range(
m_timestamp_dict->get_begin_timestamp(),
m_timestamp_dict->get_end_timestamp()
m_timestamp_dict.get_begin_timestamp(),
m_timestamp_dict.get_end_timestamp()
);

m_metadata_db->add_archive(m_id, metadata);
Expand Down
66 changes: 57 additions & 9 deletions components/core/src/clp_s/ArchiveWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "SchemaMap.hpp"
#include "SchemaTree.hpp"
#include "SchemaWriter.hpp"
#include "SingleFileArchiveDefs.hpp"
#include "TimestampDictionaryWriter.hpp"

namespace clp_s {
Expand All @@ -22,6 +23,7 @@ struct ArchiveWriterOption {
std::string archives_dir;
int compression_level;
bool print_archive_stats;
bool single_file_archive;
size_t min_table_size;
};

Expand Down Expand Up @@ -125,7 +127,7 @@ class ArchiveWriter {
std::string const& timestamp,
uint64_t& pattern_id
) {
return m_timestamp_dict->ingest_entry(key, node_id, timestamp, pattern_id);
return m_timestamp_dict.ingest_entry(key, node_id, timestamp, pattern_id);
}

/**
Expand All @@ -135,21 +137,24 @@ class ArchiveWriter {
* @param timestamp
*/
void ingest_timestamp_entry(std::string const& key, int32_t node_id, double timestamp) {
m_timestamp_dict->ingest_entry(key, node_id, timestamp);
m_timestamp_dict.ingest_entry(key, node_id, timestamp);
}

void ingest_timestamp_entry(std::string const& key, int32_t node_id, int64_t timestamp) {
m_timestamp_dict->ingest_entry(key, node_id, timestamp);
m_timestamp_dict.ingest_entry(key, node_id, timestamp);
}

/**
* Increments the size of the compressed data written to the archive
* Increments the size of the original (uncompressed) logs ingested into the archive. This size
* tracks the raw input size before any encoding or compression.
* @param size
*/
void increment_uncompressed_size(size_t size) { m_uncompressed_size += size; }

/**
* @return Size of the uncompressed data written to the archive
* @return The total size of the encoded (uncompressed) data written to the archive. This
* reflects the size of the data after encoding but before compression.
* TODO: Add the size of schema tree, schema map and timestamp dictionary
*/
size_t get_data_size();

Expand All @@ -162,10 +167,40 @@ class ArchiveWriter {
void initialize_schema_writer(SchemaWriter* writer, Schema const& schema);

/**
* Stores the tables
* @return Size of the compressed data in bytes
* Compresses and stores the tables.
* @return A pair containing:
* - The size of the compressed table metadata in bytes.
* - The size of the compressed tables in bytes.
*/
[[nodiscard]] size_t store_tables();
[[nodiscard]] std::pair<size_t, size_t> store_tables();

/**
* Writes the archive to a single file
* @param files
*/
void write_single_file_archive(std::vector<ArchiveFileInfo> const& files);

/**
* Writes the metadata section of the single file archive
* @param archive_writer
* @param files
*/
void
write_archive_metadata(FileWriter& archive_writer, std::vector<ArchiveFileInfo> const& files);

/**
* Writes the file section of the single file archive
* @param archive_writer
* @param files
*/
void write_archive_files(FileWriter& archive_writer, std::vector<ArchiveFileInfo> const& files);

/**
* Writes the header section of the single file archive
* @param archive_writer
* @param metadata_section_size
*/
void write_archive_header(FileWriter& archive_writer, size_t metadata_section_size);

/**
* Updates the metadata db with the archive's metadata (id, size, timestamp ranges, etc.)
Expand All @@ -177,23 +212,36 @@ class ArchiveWriter {
*/
void print_archive_stats();

/**
* Write the timestamp dictionary as a dedicated file for multi-file archives.
*
* Note: the timestamp dictionary will be moved into the metadata region of multi-file archives
* in a follow-up PR.
* @return the compressed size of the Timestamp Dictionary in bytes
*/
size_t write_timestamp_dict();

static constexpr size_t cReadBlockSize = 4 * 1024;

size_t m_encoded_message_size{};
size_t m_uncompressed_size{};
size_t m_compressed_size{};
int64_t m_next_log_event_id{};

std::string m_id;

std::string m_archives_dir;
std::string m_archive_path;
std::string m_encoded_messages_dir;

std::shared_ptr<VariableDictionaryWriter> m_var_dict;
std::shared_ptr<LogTypeDictionaryWriter> m_log_dict;
std::shared_ptr<LogTypeDictionaryWriter> m_array_dict; // log type dictionary for arrays
std::shared_ptr<TimestampDictionaryWriter> m_timestamp_dict;
TimestampDictionaryWriter m_timestamp_dict;
std::shared_ptr<clp::GlobalMySQLMetadataDB> m_metadata_db;
int m_compression_level{};
bool m_print_archive_stats{};
bool m_single_file_archive{};
size_t m_min_table_size{};

SchemaMap m_schema_map;
Expand Down
Loading

0 comments on commit f1876cd

Please sign in to comment.