From 870faa1ebe683ad2212bd8231f7e7ce1d29ae200 Mon Sep 17 00:00:00 2001 From: Lanzheng Liu Date: Tue, 17 Oct 2023 17:38:17 +0800 Subject: [PATCH] multi-processor zfile builder Signed-off-by: Lanzheng Liu --- src/overlaybd/zfile/compressor.h | 6 +- src/overlaybd/zfile/test/test.cpp | 72 ++++++++ src/overlaybd/zfile/zfile.cpp | 287 +++++++++++++++++++++++++++--- src/tools/overlaybd-commit.cpp | 3 + 4 files changed, 343 insertions(+), 25 deletions(-) diff --git a/src/overlaybd/zfile/compressor.h b/src/overlaybd/zfile/compressor.h index 76c4fb5a..a56605b6 100644 --- a/src/overlaybd/zfile/compressor.h +++ b/src/overlaybd/zfile/compressor.h @@ -60,10 +60,12 @@ class CompressArgs { photon::fs::IFile *fdict = nullptr; std::unique_ptr dict_buf = nullptr; CompressOptions opt; + bool overwrite_header; + int workers; CompressArgs(const CompressOptions &opt, photon::fs::IFile *dict = nullptr, - unsigned char *dict_buf = nullptr) - : fdict(dict), dict_buf(dict_buf), opt(opt) { + unsigned char *dict_buf = nullptr, bool overwrite_header = false, int workers = 1) + : fdict(dict), dict_buf(dict_buf), opt(opt), overwrite_header(overwrite_header), workers(workers) { if (fdict || dict_buf) { this->opt.use_dict = 1; } diff --git a/src/overlaybd/zfile/test/test.cpp b/src/overlaybd/zfile/test/test.cpp index b685e415..4ee05a22 100644 --- a/src/overlaybd/zfile/test/test.cpp +++ b/src/overlaybd/zfile/test/test.cpp @@ -277,6 +277,78 @@ TEST_F(ZFileTest, dsa) { ASSERT_EQ(ret, 0); } +TEST_F(ZFileTest, verify_builder) { + auto fn_src = "verify.data"; + auto fn_zfile = "verify.zfile"; + auto fn_zfile_1 = "verify.zfile.1"; + auto src = lfs->open(fn_src, O_CREAT | O_TRUNC | O_RDWR, 0644); + if (src == nullptr) { + LOG_ERROR("failed to open file: `(`)", errno, strerror(errno)); + return; + } + randwrite(src, write_times); + struct stat _st; + if (src->fstat(&_st) != 0) { + LOG_ERROR("failed randwrite src file: `(`)", errno, strerror(errno)); + return; + } + + // zfile builder multi-processor + auto dst = lfs->open(fn_zfile, O_CREAT | O_TRUNC | O_RDWR, 0644); + if (!dst) { + LOG_ERROR("failed to open file: `(`)", errno, strerror(errno)); + } + DEFER({delete dst;}); + ZFile::CompressOptions opt; + opt.verify = 1; + opt.block_size = 4096; + ZFile::CompressArgs zfile_args(opt); + zfile_args.workers = 4; + auto zfile_builder = ZFile::new_zfile_builder(dst, &zfile_args, false); + src->lseek(0, 0); + char buf[16*1024]; + while (true) { + auto sz = rand() % 8192 + 1; + auto rc = src->read(buf, sz); + if (rc <= 0) break; + zfile_builder->write(buf, rc); + } + zfile_builder->close(); + + // zfile builder + ZFile::CompressOptions opt_1; + opt_1.verify = 1; + opt_1.block_size = 4096; + ZFile::CompressArgs zfile_args_1(opt_1); + zfile_args_1.workers = 1; + auto dst_1 = lfs->open(fn_zfile_1, O_CREAT | O_TRUNC | O_RDWR, 0644); + if (!dst_1) { + LOG_ERROR("failed to open file: `(`)", errno, strerror(errno)); + } + DEFER({delete dst_1;}); + auto zfile_builder_1 = ZFile::new_zfile_builder(dst_1, &zfile_args_1, false); + src->lseek(0, 0); + while (true) { + auto sz = rand() % 8192 + 1; + auto rc = src->read(buf, sz); + if (rc <= 0) break; + zfile_builder_1->write(buf, rc); + } + zfile_builder_1->close(); + + EXPECT_EQ(dst->lseek(0, SEEK_CUR), dst_1->lseek(0, SEEK_CUR)); + dst->lseek(0, 0); + dst_1->lseek(0, 0); + char buf_1[16*1024]; + while (true) { + auto rc = dst->read(buf, 8192); + auto rc_1 = dst_1->read(buf_1, 8192); + EXPECT_EQ(rc, rc_1); + EXPECT_EQ(memcmp(buf, buf_1, rc), 0); + if (rc == 0) break; + } +} + int main(int argc, char **argv) { auto seed = 154702356; cerr << "seed = " << seed << endl; diff --git a/src/overlaybd/zfile/zfile.cpp b/src/overlaybd/zfile/zfile.cpp index b2ab970e..43f99946 100644 --- a/src/overlaybd/zfile/zfile.cpp +++ b/src/overlaybd/zfile/zfile.cpp @@ -15,8 +15,6 @@ */ #include "zfile.h" -#include -#include #include #include #include @@ -29,9 +27,12 @@ #include #include #include +#include +#include #include "crc32/crc32c.h" #include "compressor.h" -#include "photon/common/alog.h" +#include +#include using namespace photon::fs; @@ -559,17 +560,23 @@ ssize_t compress_data(ICompressor *compressor, const unsigned char *buf, size_t return compressed_len; } -class ZFileBuilder : public VirtualReadOnlyFile { +class ZFileBuilderBase : public VirtualReadOnlyFile { public: - ZFileBuilder(IFile *file, const CompressArgs *args, bool ownership) - : m_dest(file), m_opt(args->opt), m_ownership(ownership) { + virtual int init() = 0; + virtual int fini() = 0; +}; +class ZFileBuilder : public ZFileBuilderBase { +public: + ZFileBuilder(IFile *file, const CompressArgs *args, bool ownership) + : m_dest(file), m_args(args), m_ownership(ownership) { + m_opt = m_args->opt; LOG_INFO("create stream compressing object. [ block size: `, type: `, enable_checksum: `]", m_opt.block_size, m_opt.algo, m_opt.verify); } - int init(const CompressArgs *args) { - m_compressor = create_compressor(args); + int init() { + m_compressor = create_compressor(m_args); if (m_compressor == nullptr) { LOG_ERRNO_RETURN(0, -1, "create compressor failed."); } @@ -626,16 +633,19 @@ class ZFileBuilder : public VirtualReadOnlyFile { auto ret = write_header_trailer(m_dest, false, true, true, pht); if (ret < 0) LOG_ERRNO_RETURN(0, -1, "failed to write trailer"); - LOG_INFO("overwrite file header."); - ret = write_header_trailer(m_dest, true, false, true, pht, 0); - if (ret < 0) { - LOG_ERRNO_RETURN(0, -1, "failed to overwrite header"); + if (m_args->overwrite_header) { + LOG_INFO("overwrite file header."); + ret = write_header_trailer(m_dest, true, false, true, pht, 0); + if (ret < 0) { + LOG_ERRNO_RETURN(0, -1, "failed to overwrite header"); + } } return 0; } virtual int close() override { - auto ret = fini(); + if (fini() < 0) + return -1; delete m_compressor; delete[] compressed_data; if (m_ownership) { @@ -683,6 +693,7 @@ class ZFileBuilder : public VirtualReadOnlyFile { off_t moffset = 0; size_t raw_data_size = 0; size_t m_buf_size = 0; + const CompressArgs *m_args; CompressOptions m_opt; ICompressor *m_compressor = nullptr; bool m_ownership = false; @@ -696,7 +707,231 @@ class ZFileBuilder : public VirtualReadOnlyFile { UNIMPLEMENTED(int fstat(struct stat *buf) override); }; -// static std::unique_ptr +// multi-processor supported zfile builder +class ZFileBuilderMP : public ZFileBuilderBase { +public: + ZFileBuilderMP(IFile *file, const CompressArgs *args, bool ownership) + : m_dest(file), m_args(args), m_ownership(ownership) { + m_workers = args->workers; + m_opt = m_args->opt; + LOG_INFO("create multi-processor stream compressing object. [ block size: `, alog: `, enable_checksum: `, workers: `]", + m_opt.block_size, m_opt.algo, m_opt.verify, m_workers); + } + + class WorkerCtx { + public: + int id; + bool writable = false; + unsigned char* ibuf = nullptr; + unsigned char* obuf = nullptr; + size_t size; + size_t buf_size; + photon::semaphore writable_sem; + photon::semaphore compress_sem; + photon::semaphore write_sem; + int result = 0; + + WorkerCtx(int id, size_t buf_size) + : id(id), buf_size(buf_size), writable_sem(1), compress_sem(0), write_sem(0) { + ibuf = new unsigned char[buf_size]; + obuf = new unsigned char[buf_size]; + } + + ~WorkerCtx() { + delete ibuf; + delete obuf; + } + void start_compress(int isize) { + writable = false; + size = isize; + compress_sem.signal(1); + } + }; + + int init() { + auto pht = new (m_ht)(CompressionFile::HeaderTrailer); + pht->set_compress_option(m_opt); + LOG_INFO("write header."); + auto ret = write_header_trailer(m_dest, true, false, true, pht); + if (ret < 0) { + LOG_ERRNO_RETURN(0, -1, "failed to write header"); + } + moffset = + CompressionFile::HeaderTrailer::SPACE + 0; // opt.dict_size; + // currently dictionary is not supported. + m_buf_size = m_opt.block_size + BUF_SIZE; + cur_id = 0; + for (int i = 0; i < m_workers; i++) + workers.emplace_back(new WorkerCtx(i, m_buf_size)); + + for (int i = 0; i < m_workers; i++) { + ths.emplace_back([&, id=i] { + photon::init(photon::INIT_EVENT_EPOLL, photon::INIT_IO_NONE); + DEFER(photon::fini()); + + auto ctx = workers[id]; + auto next_ctx = workers[(id+1)%m_workers]; + auto compressor = create_compressor(m_args); + if (compressor == nullptr) { + ctx->result = -1; + LOG_ERRNO_RETURN(0, -1, "failed to create compressor"); + } + DEFER(delete compressor); + + while (true) { + ctx->compress_sem.wait(1); + if (m_stop_flag && ctx->size == 0) { + break; + } + auto compressed_size = + compress_data(compressor, ctx->ibuf, ctx->size, ctx->obuf, ctx->buf_size, m_opt.verify); + if (compressed_size < 0) { + ctx->result = -1; + LOG_ERRNO_RETURN(EIO, -1, "failed to compress"); + } + + ctx->size = 0; + // ibuf is ready to write + ctx->writable_sem.signal(1); + + ctx->write_sem.wait(1); + moffset += compressed_size; + m_block_len.push_back(compressed_size); + if (m_dest->write(ctx->obuf, compressed_size) != compressed_size) { + ctx->result = -1; + LOG_ERRNO_RETURN(0, -1, "failed to write compressed data"); + } + next_ctx->write_sem.signal(1); + } + return 0; + }); + } + + workers[0]->write_sem.signal(1); + return 0; + } + + int fini() { + if (reserved_size) { + workers[cur_id]->start_compress(reserved_size); + } + + // wait for workers + m_stop_flag = true; + for (int i = 0; i < m_workers; i++) + workers[i]->compress_sem.signal(1); + for (auto &th : ths) { + th.join(); + } + for (int i = 0; i < m_workers; i++) { + if (workers[i]->result < 0) { + LOG_ERROR_RETURN(0, -1, "failed to compress data"); + } + } + + // compress done + uint64_t index_offset = moffset; + uint64_t index_size = m_block_len.size(); + ssize_t index_bytes = index_size * sizeof(uint32_t); + LOG_INFO("write index (offset: `, count: ` size: `)", index_offset, index_size, index_bytes); + if (m_dest->write(&m_block_len[0], index_bytes) != index_bytes) { + LOG_ERRNO_RETURN(0, -1, "failed to write index."); + } + auto pht = (CompressionFile::HeaderTrailer *)m_ht; + pht->index_crc = crc32::crc32c(&m_block_len[0], index_bytes); + LOG_INFO("index crc: ", pht->index_crc); + pht->index_offset = index_offset; + pht->index_size = index_size; + pht->original_file_size = raw_data_size; + LOG_INFO("write trailer."); + auto ret = write_header_trailer(m_dest, false, true, true, pht); + if (ret < 0) + LOG_ERRNO_RETURN(0, -1, "failed to write trailer"); + if (m_args->overwrite_header) { + LOG_INFO("overwrite file header."); + ret = write_header_trailer(m_dest, true, false, true, pht, 0); + if (ret < 0) { + LOG_ERRNO_RETURN(0, -1, "failed to overwrite header"); + } + } + return 0; + } + + virtual int close() override { + if (fini() < 0) { + return -1; + } + if (m_ownership) { + m_dest->close(); + } + return 0; + } + + inline void copy(WorkerCtx *ctx, const void *from, size_t count, off_t offset) { + if (!ctx->writable) { + ctx->writable_sem.wait(1); + ctx->writable = true; + } + memcpy(ctx->ibuf + offset, from, count); + } + + virtual ssize_t write(const void *buf, size_t count) override { + raw_data_size += count; + auto expected_ret = count; + auto ctx = workers[cur_id]; + + if (reserved_size != 0) { + if (reserved_size + count < m_opt.block_size) { + copy(ctx, buf, count, reserved_size); + reserved_size += count; + return expected_ret; // save uncompressed buffer and return write count; + } + auto delta = m_opt.block_size - reserved_size; + copy(ctx, buf, delta, reserved_size); + buf = (const void *)((const char*)buf + delta); + count -= delta; + + ctx->start_compress(reserved_size + delta); + cur_id = (cur_id+1)%m_workers; + ctx = workers[cur_id]; + reserved_size = 0; + } + + for (off_t i = 0; i < (ssize_t)count; i += m_opt.block_size) { + if (i + m_opt.block_size > (ssize_t)count) { + copy(ctx, buf+i, count-i, 0); + reserved_size = count - i; + break; + } + copy(ctx, buf+i, m_opt.block_size, 0); + ctx->start_compress(m_opt.block_size); + cur_id = (cur_id+1)%m_workers; + ctx = workers[cur_id]; + } + LOG_DEBUG("compressed ` bytes done. reserved: `", expected_ret, reserved_size); + return expected_ret; + } + + std::vector workers; + bool m_stop_flag = false; + int m_workers; + IFile *m_dest; + off_t moffset = 0; + size_t raw_data_size = 0; + size_t m_buf_size = 0; + const CompressArgs *m_args; + CompressOptions m_opt; + bool m_ownership = false; + std::vector m_block_len; + std::vector ths; + size_t reserved_size = 0; + char m_ht[CompressionFile::HeaderTrailer::SPACE]{}; + int cur_id; + + UNIMPLEMENTED_POINTER(IFileSystem *filesystem() override); + UNIMPLEMENTED(int fstat(struct stat *buf) override); +}; + bool load_jump_table(IFile *file, CompressionFile::HeaderTrailer *pheader_trailer, CompressionFile::JumpTable &jump_table, bool trailer = true) { char buf[CompressionFile::HeaderTrailer::SPACE]; @@ -844,7 +1079,6 @@ static int write_header_trailer(IFile *file, bool is_header, bool is_sealed, boo } int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) { - if (args == nullptr) { LOG_ERROR_RETURN(EINVAL, -1, "CompressArgs is null"); } @@ -941,10 +1175,12 @@ int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) { ret = write_header_trailer(as, false, true, true, pht); if (ret < 0) LOG_ERRNO_RETURN(0, -1, "failed to write trailer"); - LOG_INFO("overwrite file header."); - ret = write_header_trailer(as, true, false, true, pht, 0); - if (ret < 0) { - LOG_ERRNO_RETURN(0, -1, "failed to overwrite header"); + if (args->overwrite_header) { + LOG_INFO("overwrite file header."); + ret = write_header_trailer(as, true, false, true, pht, 0); + if (ret < 0) { + LOG_ERRNO_RETURN(0, -1, "failed to overwrite header"); + } } return 0; } @@ -1021,11 +1257,16 @@ int is_zfile(IFile *file) { } IFile *new_zfile_builder(IFile *file, const CompressArgs *args, bool ownership) { - auto r = new ZFileBuilder(file, args, ownership); - if (r->init(args) != 0) { - delete r; + ZFileBuilderBase *builder; + if (args->workers == 1) { + builder = new ZFileBuilder(file, args, ownership); + } else { + builder = new ZFileBuilderMP(file, args, ownership); + } + if (builder->init() != 0) { + delete builder; LOG_ERRNO_RETURN(0, nullptr, "init zfileStreamWriter failed."); } - return r; + return builder; } } // namespace ZFile diff --git a/src/tools/overlaybd-commit.cpp b/src/tools/overlaybd-commit.cpp index ae26c7ab..e3353113 100644 --- a/src/tools/overlaybd-commit.cpp +++ b/src/tools/overlaybd-commit.cpp @@ -58,6 +58,7 @@ int main(int argc, char **argv) { bool build_fastoci = false; bool tar = false, rm_old = false, seal = false, commit_sealed = false; bool verbose = false; + int compress_threads = 1; CLI::App app{"this is overlaybd-commit"}; app.add_option("-m", commit_msg, "add some custom message if needed"); @@ -77,6 +78,7 @@ int main(int argc, char **argv) { app.add_option("commit_file", commit_file_path, "commit file path")->type_name("FILEPATH"); app.add_flag("--seal", seal, "seal only, data_file is output itself")->default_val(false); app.add_flag("--commit_sealed", commit_sealed, "commit sealed, index_file is output")->default_val(false); + app.add_option("--compress_threads", compress_threads, "compress threads")->default_val(1); app.add_flag("--verbose", verbose, "output debug info")->default_val(false); CLI11_PARSE(app, argc, argv); build_turboOCI = build_turboOCI || build_fastoci; @@ -145,6 +147,7 @@ int main(int argc, char **argv) { fout = open_file(fs, commit_file_path.c_str(), O_RDWR | O_EXCL | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); ZFile::CompressArgs zfile_args(opt); + zfile_args.workers = compress_threads; zfile_builder = ZFile::new_zfile_builder(fout, &zfile_args, false); out = zfile_builder; } else {