diff --git a/CMakeLists.txt b/CMakeLists.txt index 82650611..86b30a8b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,6 +42,16 @@ set(ENABLE_MIMIC_VDSO off) option(BUILD_CURL_FROM_SOURCE "Compile static libcurl" off) find_package(photon REQUIRED) find_package(tcmu REQUIRED) +find_package(yamlcpp) +if (NOT yamlcpp_FOUND) + FetchContent_Declare( + yamlcpp + GIT_REPOSITORY https://github.com/jbeder/yaml-cpp.git + GIT_TAG 0.8.0 + ) + FetchContent_MakeAvailable(yamlcpp) +endif() + if(BUILD_TESTING) enable_testing() diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 599b8b5b..9c79d142 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -13,6 +13,7 @@ add_library(overlaybd_image_lib switch_file.cpp bk_download.cpp prefetch.cpp + tools/sha256file.cpp ) target_include_directories(overlaybd_image_lib PUBLIC ${CURL_INCLUDE_DIRS} diff --git a/src/bk_download.cpp b/src/bk_download.cpp index 61179082..72e3e70e 100644 --- a/src/bk_download.cpp +++ b/src/bk_download.cpp @@ -31,6 +31,7 @@ #include #include "switch_file.h" #include "image_file.h" +#include "tools/sha256file.h" using namespace photon::fs; @@ -38,43 +39,6 @@ static constexpr size_t ALIGNMENT = 4096; namespace BKDL { -std::string sha256sum(const char *fn) { - constexpr size_t BUFFERSIZE = 65536; - int fd = open(fn, O_RDONLY | O_DIRECT); - if (fd < 0) { - LOG_ERROR("failed to open `", fn); - return ""; - } - DEFER(close(fd);); - - struct stat stat; - if (::fstat(fd, &stat) < 0) { - LOG_ERROR("failed to stat `", fn); - return ""; - } - SHA256_CTX ctx = {0}; - SHA256_Init(&ctx); - __attribute__((aligned(ALIGNMENT))) char buffer[65536]; - unsigned char sha[32]; - ssize_t recv = 0; - for (off_t offset = 0; offset < stat.st_size; offset += BUFFERSIZE) { - recv = pread(fd, &buffer, BUFFERSIZE, offset); - if (recv < 0) { - LOG_ERROR("io error: `", fn); - return ""; - } - if (SHA256_Update(&ctx, buffer, recv) < 0) { - LOG_ERROR("sha256 calculate error: `", fn); - return ""; - } - } - SHA256_Final(sha, &ctx); - char res[SHA256_DIGEST_LENGTH * 2]; - for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) - sprintf(res + (i * 2), "%02x", sha[i]); - return "sha256:" + std::string(res, SHA256_DIGEST_LENGTH * 2); -} - bool check_downloaded(const std::string &dir) { std::string fn = dir + "/" + COMMIT_FILE_NAME; auto lfs = photon::fs::new_localfs_adaptor(); diff --git a/src/example_config/stream-conv.yaml b/src/example_config/stream-conv.yaml new file mode 100644 index 00000000..50e67072 --- /dev/null +++ b/src/example_config/stream-conv.yaml @@ -0,0 +1,13 @@ +globalConfig: + workDir: /tmp/stream_conv + udsAddr: /var/run/stream_conv.sock + # httpAddr: 127.0.0.1 + httpPort: 9101 + reusePort: true + + logConfig: + level: 1 + mode: stdout + rotateNum: 3 + limitSizeMB: 10 + path: /var/log/overlaybd/stream_convertor.log diff --git a/src/overlaybd/CMakeLists.txt b/src/overlaybd/CMakeLists.txt index f39ddfdd..d700f808 100644 --- a/src/overlaybd/CMakeLists.txt +++ b/src/overlaybd/CMakeLists.txt @@ -6,6 +6,7 @@ add_subdirectory(tar) add_subdirectory(extfs) add_subdirectory(gzip) add_subdirectory(gzindex) +add_subdirectory(stream_convertor) add_library(overlaybd_lib INTERFACE) target_include_directories(overlaybd_lib INTERFACE diff --git a/src/overlaybd/gzindex/gzfile.cpp b/src/overlaybd/gzindex/gzfile.cpp index 15c174fd..55af02da 100644 --- a/src/overlaybd/gzindex/gzfile.cpp +++ b/src/overlaybd/gzindex/gzfile.cpp @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include "gzfile_index.h" @@ -356,7 +355,6 @@ ssize_t GzFile::pread(void *buf, size_t count, off_t offset) { if (p == nullptr) { LOG_ERRNO_RETURN(0, -1, "Failed to seek_index(,`)", offset); } - //LOG_DEBUG("offset:`, index->de_pos:", offset, p->de_pos+0); return extract(p, offset, (unsigned char*)buf, count); } diff --git a/src/overlaybd/gzindex/gzfile.h b/src/overlaybd/gzindex/gzfile.h index 8d512bd7..7fecb62e 100644 --- a/src/overlaybd/gzindex/gzfile.h +++ b/src/overlaybd/gzindex/gzfile.h @@ -16,6 +16,9 @@ #pragma once #include "photon/fs/filesystem.h" +#include "gzfile_index.h" + + extern photon::fs::IFile* new_gzfile(photon::fs::IFile* gzip_file, photon::fs::IFile* index, bool ownership = false); //chunksize: @@ -32,6 +35,7 @@ extern photon::fs::IFile* new_gzfile(photon::fs::IFile* gzip_file, photon::fs::I //0: no compression //1: best speed //9: best compression -extern int create_gz_index(photon::fs::IFile* gzip_file, const char *index_file_path, off_t chunk_size=1048576, int dict_compress_algo=1, int dict_compress_level=6); +extern int create_gz_index(photon::fs::IFile* gzip_file, const char *index_file_path, + off_t chunk_size=GZ_CHUNK_SIZE, int dict_compress_algo=GZ_DICT_COMPERSS_ALGO, int dict_compress_level=GZ_COMPRESS_LEVEL); bool is_gzfile(photon::fs::IFile* file); diff --git a/src/overlaybd/gzindex/gzfile_index.h b/src/overlaybd/gzindex/gzfile_index.h index eb0e1f7d..5438feaf 100644 --- a/src/overlaybd/gzindex/gzfile_index.h +++ b/src/overlaybd/gzindex/gzfile_index.h @@ -18,8 +18,16 @@ #include #include #include +#include #include +#include #include "photon/common/checksum/crc32c.h" +#include "photon/fs/filesystem.h" + +#define GZ_CHUNK_SIZE 1048576 +#define GZ_DICT_COMPERSS_ALGO 1 +#define GZ_COMPRESS_LEVEL 6 + #define WINSIZE 32768U #define DEFLATE_BLOCK_UNCOMPRESS_MAX_SIZE 65536U #define GZFILE_INDEX_MAGIC "ddgzidx" @@ -76,3 +84,13 @@ struct IndexEntry { typedef std::vector INDEX; + +class IndexFilterRecorder; +IndexFilterRecorder *new_index_filter(IndexFileHeader *h, INDEX *index, photon::fs::IFile *save_as); +void delete_index_filter(IndexFilterRecorder *&); + +int init_index_header(photon::fs::IFile* src, IndexFileHeader &h, off_t span, int dict_compress_algo, int dict_compress_level); + +int create_index_entry(z_stream strm, IndexFilterRecorder *filter, off_t en_pos, off_t de_pos, unsigned char *window); + +int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file, ssize_t gzip_file_size = -1); diff --git a/src/overlaybd/gzindex/gzip_index_create.cpp b/src/overlaybd/gzindex/gzip_index_create.cpp index cf1b0ff9..8bec915f 100644 --- a/src/overlaybd/gzindex/gzip_index_create.cpp +++ b/src/overlaybd/gzindex/gzip_index_create.cpp @@ -13,13 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. */ - #include #include #include #include #include + #include "gzfile_index.h" + #include "photon/common/alog.h" #include "photon/common/alog-stdstring.h" #include "photon/fs/localfs.h" @@ -179,9 +180,30 @@ static int dict_compress(const IndexFileHeader& h, return -1; } -static int build_index(IndexFileHeader& h,photon::fs::IFile *gzfile, INDEX &index, photon::fs::IFile* index_file) { - IndexFilterRecorder filter(&h, &index, index_file); +int create_index_entry(z_stream strm, IndexFilterRecorder *filter, off_t en_pos, off_t de_pos, unsigned char *window){ + LOG_DEBUG("`",VALUE(strm.data_type)); + if ((strm.data_type & EACH_DEFLATE_BLOCK_BIT) && !(strm.data_type & LAST_DEFLATE_BLOCK_BIT)) { + if (filter->record(strm.data_type & 7, en_pos, de_pos, strm.avail_out, window) != 0) { + return -1; + } + } + return 0; +} +IndexFilterRecorder* new_index_filter(IndexFileHeader *h, INDEX *index, photon::fs::IFile *save_as) +{ + return new IndexFilterRecorder(h, index, save_as); +} + +void delete_index_filter(IndexFilterRecorder *&idx_filter) { + delete idx_filter; + idx_filter = nullptr; +} + +static int build_index(IndexFileHeader& h,photon::fs::IFile *gzfile, INDEX &index, photon::fs::IFile* index_file) { + // IndexFilterRecorder filter(&h, &index, index_file); + auto filter = new IndexFilterRecorder(&h, &index, index_file); + DEFER(delete filter); int32_t inbuf_size = WINSIZE; unsigned char *inbuf = new unsigned char[inbuf_size]; DEFER(delete []inbuf); @@ -216,7 +238,6 @@ static int build_index(IndexFileHeader& h,photon::fs::IFile *gzfile, INDEX &inde strm.avail_out = WINSIZE; strm.next_out = window; } - ttin += strm.avail_in; ttout += strm.avail_out; ret = inflate(&strm, Z_BLOCK); @@ -231,12 +252,10 @@ static int build_index(IndexFileHeader& h,photon::fs::IFile *gzfile, INDEX &inde LOG_ERRNO_RETURN(0, -1, "Fail to inflate. ret:`", ret); } //TODO Here generate crc32 for uncompressed data block - - if ((strm.data_type & EACH_DEFLATE_BLOCK_BIT) && !(strm.data_type & LAST_DEFLATE_BLOCK_BIT)) { - if (filter.record(strm.data_type & 7, ttin, ttout, strm.avail_out, window) != 0) { - LOG_ERRNO_RETURN(ret, -1, "Failed to add_index_entry"); - } + if (create_index_entry(strm, filter, ttin, ttout, window) != 0){ + LOG_ERRNO_RETURN(ret, -1, "Failed to add_index_entry"); } + } while (strm.avail_in != 0); } while (ret != Z_STREAM_END); return 0; @@ -258,15 +277,20 @@ static int get_compressed_index(const IndexFileHeader& h, const INDEX& index, un out_len = index_len; return 0; } - + LOG_INFO("index crc: `", crc32(0, buf, index_len)); return zlib_compress(h.dict_compress_level, buf, index_len, out, out_len); } -static int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file) { +int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file, ssize_t gzip_file_size) { int indx_cmpr_buf_len = index.size() * sizeof(IndexEntry) * 2 + 4096; unsigned char *buf = new unsigned char[indx_cmpr_buf_len]; DEFER(delete []buf); + if (gzip_file_size != -1) { + LOG_INFO("save gzip file size: `", gzip_file_size); + h.gzip_file_size = gzip_file_size; + } + if (get_compressed_index(h, index, buf, indx_cmpr_buf_len) != 0) { LOG_ERROR_RETURN(0, -1, "Failed to get_compress_index"); } @@ -293,6 +317,28 @@ static int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFil return 0; } +int init_index_header(photon::fs::IFile* src, IndexFileHeader &h, off_t span, int dict_compress_algo, int dict_compress_level) { + + struct stat sbuf; + if (src->fstat(&sbuf) != 0) { + LOG_ERRNO_RETURN(0, -1, "Faild to gzip_file->fstat()"); + } + memset(&h, 0, sizeof(h)); + strncpy(h.magic, "ddgzidx", sizeof(h.magic)); + h.major_version =1; + h.minor_version =0; + h.dict_compress_algo = dict_compress_algo; + h.dict_compress_level = dict_compress_level; + h.flag=0; + h.index_size = sizeof(struct IndexEntry); + h.span = span; + h.window= WINSIZE; + h.gzip_file_size= sbuf.st_size; + memset(h.reserve, 0, sizeof(h.reserve)); + h.index_start = sizeof(h); + return 0; +} + //int create_gz_index(photon::fs::IFile* gzip_file, const char *index_file_path, off_t span, unsigned char dict_compress_algo) { //int create_gz_index(photon::fs::IFile* gzip_file, off_t span, const char *index_file_path) { int create_gz_index(photon::fs::IFile* gzip_file, const char *index_file_path, off_t span, int dict_compress_algo, int dict_compress_level) { @@ -310,11 +356,6 @@ int create_gz_index(photon::fs::IFile* gzip_file, const char *index_file_path, o LOG_ERRNO_RETURN(0, -1, "Span is too small, must be greater than 100, span:`", span); } - struct stat sbuf; - if (gzip_file->fstat(&sbuf) != 0) { - LOG_ERRNO_RETURN(0, -1, "Faild to gzip_file->fstat()"); - } - photon::fs::IFile *index_file = photon::fs::open_localfile_adaptor(index_file_path, O_RDWR | O_CREAT | O_TRUNC, 0644); if (index_file == nullptr) { LOG_ERROR_RETURN(0, -1, "Failed to open(`)", index_file_path); @@ -322,21 +363,15 @@ int create_gz_index(photon::fs::IFile* gzip_file, const char *index_file_path, o DEFER(index_file->close()); IndexFileHeader h; - memset(&h, 0, sizeof(h)); - strncpy(h.magic, "ddgzidx", sizeof(h.magic)); - h.major_version =1; - h.minor_version =0; - h.dict_compress_algo = dict_compress_algo; - h.dict_compress_level = dict_compress_level; - h.flag=0; - h.index_size = sizeof(struct IndexEntry); - h.span = span; - h.window= WINSIZE; - h.gzip_file_size= sbuf.st_size; - memset(h.reserve, 0, sizeof(h.reserve)); - h.index_start = sizeof(h); - + if (init_index_header(gzip_file, h, span, dict_compress_algo, dict_compress_level) != 0) { + LOG_ERRNO_RETURN(0, -1, "init index header failed."); + } INDEX index; + DEFER({ + for (auto it : index) { + delete it; + } + }); int ret = build_index(h, gzip_file, index, index_file); if (ret != 0) { LOG_ERRNO_RETURN(0, -1, "Faild to build_index"); diff --git a/src/overlaybd/gzindex/test/CMakeLists.txt b/src/overlaybd/gzindex/test/CMakeLists.txt index 650ed19b..9decda13 100644 --- a/src/overlaybd/gzindex/test/CMakeLists.txt +++ b/src/overlaybd/gzindex/test/CMakeLists.txt @@ -6,7 +6,8 @@ link_directories($ENV{GTEST}/lib) add_executable(gzindex_test test.cpp) target_include_directories(gzindex_test PUBLIC ${PHOTON_INCLUDE_DIR}) -target_link_libraries(gzindex_test gtest gtest_main gflags pthread photon_static gzindex_lib cache_lib) +target_link_libraries(gzindex_test gtest gtest_main gflags pthread photon_static + gzindex_lib gzip_lib cache_lib checksum_lib) add_test( NAME gzindex_test diff --git a/src/overlaybd/gzindex/test/test.cpp b/src/overlaybd/gzindex/test/test.cpp index 1c4fa18e..69c7086e 100644 --- a/src/overlaybd/gzindex/test/test.cpp +++ b/src/overlaybd/gzindex/test/test.cpp @@ -15,17 +15,26 @@ */ #include "../gzfile.h" +#include "../../gzip/gz.h" #include "../../cache/gzip_cache/cached_fs.h" #include "../../cache/cache.h" #include #include #include +#include #include +#include +#include +#include +// #include #include #include +#include #include #include +#include +#include "../../../tools/sha256file.h" struct PreadTestCase { off_t offset; @@ -82,8 +91,8 @@ class GzIndexTest : public ::testing::Test { void test_pread(PreadTestCase t) { char *buf1 = new char[t.count]; char *buf2 = new char[t.count]; - DEFER(delete []buf1); - DEFER(delete []buf2); + DEFER(delete[] buf1); + DEFER(delete[] buf2); ssize_t ret1 = defile->pread(buf1, t.count, t.offset); ssize_t ret2 = gzfile->pread(buf2, t.count, t.offset); EXPECT_EQ(ret1, t.ret); @@ -114,7 +123,7 @@ class GzIndexTest : public ::testing::Test { static int buildDataFile() { // uncompressed data unsigned char *buf = new unsigned char[vsize]; - DEFER(delete []buf); + DEFER(delete[] buf); for (size_t i = 0; i < vsize; i++) { auto j = rand() % 256; buf[i] = j; @@ -129,7 +138,7 @@ class GzIndexTest : public ::testing::Test { // gzip data size_t gzlen = compressBound(vsize) + 4096; unsigned char *gzbuf = new unsigned char[gzlen]; - DEFER(delete []gzbuf); + DEFER(delete[] gzbuf); if (gzip_compress(buf, vsize, gzbuf, gzlen) != 0) { LOG_ERRNO_RETURN(0, -1, "failed to gzip_compress(...)"); } @@ -155,10 +164,12 @@ class GzIndexTest : public ::testing::Test { return 0; } - static int gzip_compress(unsigned char *in, size_t in_len, unsigned char *out, size_t &out_len) { + static int gzip_compress(unsigned char *in, size_t in_len, unsigned char *out, + size_t &out_len) { z_stream strm; memset(&strm, 0, sizeof(strm)); - int ret = deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY); + int ret = deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, + Z_DEFAULT_STRATEGY); if (ret != Z_OK) { LOG_ERRNO_RETURN(0, -1, "failed to deflateInit2(...)"); } @@ -185,14 +196,144 @@ const char *GzIndexTest::fn_defile = "/fdata"; const char *GzIndexTest::fn_gzdata = "/fdata.gz"; const char *GzIndexTest::fn_gzindex = "/findex"; +char uds_path[] = "/tmp/gzstream_test/stream_conv.sock"; + +int download(const std::string &url, const std::string &out) { + // if (::access(out.c_str(), 0) == 0) + // return 0; + auto base = std::string(basename(url.c_str())); + // download file + std::string cmd = "curl -sL -o " + out + " " + url; + LOG_INFO(VALUE(cmd.c_str())); + auto ret = system(cmd.c_str()); + if (ret != 0) { + LOG_ERRNO_RETURN(0, -1, "download failed: `", url.c_str()); + } + return 0; +} + +void handler(photon::net::ISocketStream *sock) { + ASSERT_NE(nullptr, sock); + LOG_DEBUG("Accepted"); + photon::thread_yield(); + char recv[65536]; + size_t count = 0; + auto dst = photon::fs::open_localfile_adaptor("/tmp/dest", O_TRUNC | O_CREAT | O_RDWR); + DEFER(delete dst); + // sock->read(recv, sizeof(ssize_t)); + // auto st_size = *(size_t*)recv; + auto gzstream = open_gzstream_file(sock, 0); + ASSERT_NE(gzstream, nullptr); + DEFER(delete gzstream); + while (true) { + auto readn = gzstream->read(recv, 65536); + if (readn <= 0) + break; + + count += readn; + dst->write(recv, readn); + } + auto fn_idx = gzstream->save_index(); + LOG_INFO("RECV `, fn_idx: `", count, fn_idx.c_str()); + + + // ASSERT_STREQ(sha256sum("/tmp/dest").c_str(), "sha256:562688d70dcd1596556e7c671c1266f6e9c22b4f4fb8344efa8bed88fc2bac7b"); + // ASSERT_STREQ(sha256sum(fn_idx.c_str()).c_str(), "sha256:af3ffd4965d83f3d235c48ce75e16a1f2edf12d0e5d82816d7066a8485aade82"); + +} + +void uds_server() { + auto sock = photon::net::new_uds_server(true); + DEFER({ delete (sock); }); + ASSERT_EQ(0, sock->bind(uds_path)); + ASSERT_EQ(0, sock->listen(100)); + char path[PATH_MAX]; + ASSERT_EQ(0, sock->getsockname(path, PATH_MAX)); + EXPECT_EQ(0, strcmp(path, uds_path)); + LOG_INFO("uds server listening `", path); + handler(sock->accept()); + photon::thread_yield_to(nullptr); +} + +void uds_client(photon::fs::IFile *file) { + + photon::thread_yield_to(nullptr); + auto cli = photon::net::new_uds_client(); + DEFER({ delete cli; }); + LOG_DEBUG("Connecting"); + auto sock = cli->connect(uds_path); + DEFER(delete sock); + LOG_DEBUG(VALUE(sock), VALUE(errno)); + char path[PATH_MAX]; + sock->getpeername(path, PATH_MAX); + EXPECT_EQ(0, strcmp(path, uds_path)); + struct stat st; + file->fstat(&st); + LOG_INFO("Connected `, start send file data(size: `)", path, st.st_size); + // sock->write(&st.st_size, sizeof(st.st_size)); + char buff[65536]; + auto count = 0; + while (true) { + auto readn = file->read(buff, 65536); + ASSERT_NE(readn, -1); + auto ret = sock->write(buff, readn); + count += readn; + ASSERT_EQ(ret, readn); + if (readn != 65536) + break; + } + LOG_INFO("SEND: `", count); + + return; +} + +TEST_F(GzIndexTest, stream) { + std::string workdir = "/tmp/gzstream_test/"; + mkdir(workdir.c_str(), 0755); + auto lfs = photon::fs::new_localfs_adaptor(workdir.c_str()); + LOG_INFO("start streamFile test"); + std::vector filelist = { + // "https://dadi-shared.oss-cn-beijing.aliyuncs.com/cri-containerd-cni-1.5.2-linux-amd64.tar.gz", + "https://github.com/containerd/containerd/releases/download/v1.5.17/cri-containerd-cni-1.5.17-linux-amd64.tar.gz", + "https://github.com/containerd/containerd/releases/download/v1.4.4/containerd-1.4.4-linux-amd64.tar.gz", + // "https://dadi-shared.oss-cn-beijing.aliyuncs.com/containerd-1.4.4-linux-amd64.tar.gz", + // "https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.13.linux-amd64.tar.gz", + "https://go.dev/dl/go1.17.6.linux-amd64.tar.gz", + // "https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz" + }; + std::vector tar_sha256sum = { + "sha256:02adc5074f59777d2ca74c8a0291659f69291865184c987d9c10e58f58b162c2", + // "sha256:05e8b01c1ddb6ba4f8c84e7dbc76529bdc09861f9ce17c213a49e8c334f184ed", + "sha256:0ccf983abf0b0fb64cc969079982bc34761ce22d7a3236a40d49d840d150e09a", + // "sha256:1041ec4e2f40156e0731be175388be4c67aeceb44829f988df213e9fd5f26dc9", + "sha256:562688d70dcd1596556e7c671c1266f6e9c22b4f4fb8344efa8bed88fc2bac7b" + }; + int i = 0; + for (auto test_tgz : filelist) { + auto jh1 = photon::thread_enable_join(photon::thread_create11(uds_server)); + std::string fn_test_tgz = basename(test_tgz.c_str()); + ASSERT_EQ( + 0, download(test_tgz, (workdir + fn_test_tgz).c_str())); + auto file = lfs->open(fn_test_tgz.c_str(), O_RDONLY); + std::string tar_sha256="", idx_sha256=""; + uds_client(file); + photon::thread_join(jh1); + auto dst_sha256 = sha256sum("/tmp/dest"); + ASSERT_STREQ(dst_sha256.c_str(), tar_sha256sum[i++].c_str()); + lfs->unlink(fn_test_tgz.c_str()); + } + remove(uds_path); + +} + TEST_F(GzIndexTest, pread) { std::vector t{ {0, 1, 1}, {0, 10, 10}, {1000000, 1000000, 1000000}, {2000000, 1500000, 1500000}, - {(off_t) vsize - 10, 10, 10}, - {(off_t) vsize - 1, 1, 1}, + {(off_t)vsize - 10, 10, 10}, + {(off_t)vsize - 1, 1, 1}, }; group_test_pread(t); } @@ -203,11 +344,11 @@ TEST_F(GzIndexTest, pread_oob) { {-1, 2, -1}, {-1, 10000, -1}, {-9999, 10000, -1}, - {(off_t) vsize, 1, 0}, - {(off_t) vsize - 1, 2, 1}, - {(off_t) vsize - 400, 1000, 400}, - {(off_t) vsize + 1, 1, 0}, - {(off_t) vsize + 10000, 10000, 0}, + {(off_t)vsize, 1, 0}, + {(off_t)vsize - 1, 2, 1}, + {(off_t)vsize - 400, 1000, 400}, + {(off_t)vsize + 1, 1, 0}, + {(off_t)vsize + 10000, 10000, 0}, }; group_test_pread(t); } @@ -218,8 +359,9 @@ TEST_F(GzIndexTest, pread_rand) { for (size_t i = 0; i < n; i++) { size_t x = rand() % vsize; size_t y = rand() % vsize; - if (x > y) std::swap(x, y); - t.push_back({(off_t) x , y - x, (ssize_t) (y - x)}); + if (x > y) + std::swap(x, y); + t.push_back({(off_t)x, y - x, (ssize_t)(y - x)}); } group_test_pread(t); } @@ -233,7 +375,6 @@ TEST_F(GzIndexTest, fstat) { EXPECT_EQ(static_cast(st.st_size), data_size); } - class GzCacheTest : public ::testing::Test { protected: static photon::fs::IFile *defile; @@ -307,8 +448,8 @@ class GzCacheTest : public ::testing::Test { void test_pread(PreadTestCase t) { char *buf1 = new char[t.count]; char *buf2 = new char[t.count]; - DEFER(delete []buf1); - DEFER(delete []buf2); + DEFER(delete[] buf1); + DEFER(delete[] buf2); ssize_t ret1 = defile->pread(buf1, t.count, t.offset); ssize_t ret2 = gzfile->pread(buf2, t.count, t.offset); EXPECT_EQ(ret1, t.ret); @@ -326,6 +467,7 @@ class GzCacheTest : public ::testing::Test { test_pread(t[i]); } } + private: static photon::fs::IFileSystem *lfs; static Cache::GzipCachedFs *cfs; @@ -339,7 +481,7 @@ class GzCacheTest : public ::testing::Test { static int buildDataFile() { // uncompressed data unsigned char *buf = new unsigned char[vsize]; - DEFER(delete []buf); + DEFER(delete[] buf); for (size_t i = 0; i < vsize; i++) { auto j = rand() % 256; buf[i] = j; @@ -354,7 +496,7 @@ class GzCacheTest : public ::testing::Test { // gzip data size_t gzlen = compressBound(vsize) + 4096; unsigned char *gzbuf = new unsigned char[gzlen]; - DEFER(delete []gzbuf); + DEFER(delete[] gzbuf); if (gzip_compress(buf, vsize, gzbuf, gzlen) != 0) { LOG_ERRNO_RETURN(0, -1, "failed to gzip_compress(...)"); } @@ -380,10 +522,12 @@ class GzCacheTest : public ::testing::Test { return 0; } - static int gzip_compress(unsigned char *in, size_t in_len, unsigned char *out, size_t &out_len) { + static int gzip_compress(unsigned char *in, size_t in_len, unsigned char *out, + size_t &out_len) { z_stream strm; memset(&strm, 0, sizeof(strm)); - int ret = deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY); + int ret = deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, + Z_DEFAULT_STRATEGY); if (ret != Z_OK) { LOG_ERRNO_RETURN(0, -1, "failed to deflateInit2(...)"); } @@ -419,14 +563,14 @@ TEST_F(GzCacheTest, cache_store) { std::vector t{ {0, 1, 1}, {5 << 20, 1, 1}, - {(off_t) vsize - 1, 1, 1}, + {(off_t)vsize - 1, 1, 1}, }; group_test_pread(t); unsigned char *cbuf1 = new unsigned char[vsize]; unsigned char *cbuf2 = new unsigned char[vsize]; - DEFER(delete []cbuf1); - DEFER(delete []cbuf2); + DEFER(delete[] cbuf1); + DEFER(delete[] cbuf2); auto fp1 = fopen("/tmp/gzip_src/fdata", "r"); auto fp2 = fopen("/tmp/gzip_cache_decompress/fdata", "r"); DEFER(fclose(fp1)); @@ -451,8 +595,8 @@ TEST_F(GzCacheTest, pread) { {0, 10, 10}, {1000000, 1000000, 1000000}, {2000000, 1500000, 1500000}, - {(off_t) vsize - 10, 10, 10}, - {(off_t) vsize - 1, 1, 1}, + {(off_t)vsize - 10, 10, 10}, + {(off_t)vsize - 1, 1, 1}, }; group_test_pread(t); } @@ -463,8 +607,9 @@ TEST_F(GzCacheTest, pread_rand) { for (size_t i = 0; i < n; i++) { size_t x = rand() % vsize; size_t y = rand() % vsize; - if (x > y) std::swap(x, y); - t.push_back({(off_t) x , y - x, (ssize_t) (y - x)}); + if (x > y) + std::swap(x, y); + t.push_back({(off_t)x, y - x, (ssize_t)(y - x)}); } group_test_pread(t); } @@ -475,11 +620,11 @@ TEST_F(GzCacheTest, pread_oob) { {-1, 2, -1}, {-1, 10000, -1}, {-9999, 10000, -1}, - {(off_t) vsize, 1, 0}, - {(off_t) vsize - 1, 2, 1}, - {(off_t) vsize - 400, 1000, 400}, - {(off_t) vsize + 1, 1, 0}, - {(off_t) vsize + 10000, 10000, 0}, + {(off_t)vsize, 1, 0}, + {(off_t)vsize - 1, 2, 1}, + {(off_t)vsize - 400, 1000, 400}, + {(off_t)vsize + 1, 1, 0}, + {(off_t)vsize + 10000, 10000, 0}, }; group_test_pread(t); } @@ -490,8 +635,9 @@ TEST_F(GzCacheTest, pread_little) { for (size_t i = 0; i < n; i++) { size_t x = rand() % vsize; size_t y = x + rand() % 4096; - if (y >= vsize) y = vsize - 1; - t.push_back({(off_t) x , y - x, (ssize_t) (y - x)}); + if (y >= vsize) + y = vsize - 1; + t.push_back({(off_t)x, y - x, (ssize_t)(y - x)}); } group_test_pread(t); } @@ -505,7 +651,6 @@ TEST_F(GzCacheTest, fstat) { EXPECT_EQ(static_cast(st.st_size), data_size); } - int main(int argc, char **argv) { auto seed = 154574045; std::cerr << "seed = " << seed << std::endl; @@ -516,5 +661,6 @@ int main(int argc, char **argv) { photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_DEFAULT); auto ret = RUN_ALL_TESTS(); photon::fini(); - if (ret) LOG_ERROR_RETURN(0, ret, VALUE(ret)); + if (ret) + LOG_ERROR_RETURN(0, ret, VALUE(ret)); } diff --git a/src/overlaybd/gzip/CMakeLists.txt b/src/overlaybd/gzip/CMakeLists.txt index 5de3ad02..b5555c87 100644 --- a/src/overlaybd/gzip/CMakeLists.txt +++ b/src/overlaybd/gzip/CMakeLists.txt @@ -4,7 +4,8 @@ add_library(gzip_lib STATIC ${SOURCE_GZIP}) target_include_directories(gzip_lib PUBLIC ${PHOTON_INCLUDE_DIR} ) -target_link_libraries(gzip_lib photon_static) +target_link_libraries(gzip_lib photon_static checksum_lib) + # if(BUILD_TESTING) # add_subdirectory(test) # endif() diff --git a/src/overlaybd/gzip/gz.cpp b/src/overlaybd/gzip/gz.cpp index 0f524c35..0a03d56d 100644 --- a/src/overlaybd/gzip/gz.cpp +++ b/src/overlaybd/gzip/gz.cpp @@ -15,15 +15,25 @@ */ #include "gz.h" +#include #include #include +#include #include #include - - -class GzAdaptorFile: public photon::fs::VirtualReadOnlyFile { +#include +#include +#include +#include "../../tools/sha256file.h" +#include "../gzindex/gzfile_index.h" +#include "photon/common/alog.h" +#include "photon/fs/localfs.h" +class GzAdaptorFile : public photon::fs::VirtualReadOnlyFile { public: - GzAdaptorFile(gzFile gzf): m_gzf(gzf) {} + GzAdaptorFile() { + } + GzAdaptorFile(gzFile gzf) : m_gzf(gzf) { + } ~GzAdaptorFile() { gzclose(m_gzf); } @@ -39,12 +49,9 @@ class GzAdaptorFile: public photon::fs::VirtualReadOnlyFile { int fstat(struct stat *buf) override { return 0; } -private: - gzFile m_gzf; - char m_buf[1024*1024]; - int m_cur = 0, m_left = 0; + int load_data() { - auto rc = gzread(m_gzf, m_buf, 1024*1024); + auto rc = gzread(m_gzf, m_buf, 1024 * 1024); if (rc < 0) { LOG_ERRNO_RETURN(0, -1, "failed to gzread"); } @@ -53,11 +60,238 @@ class GzAdaptorFile: public photon::fs::VirtualReadOnlyFile { LOG_INFO(VALUE(rc)); return rc; } + // private: + gzFile m_gzf; + char m_buf[1024 * 1024]; + int m_cur = 0, m_left = 0; }; -photon::fs::IFile* open_gzfile_adaptor(const char *path) { +class GzStreamFile : public IGzFile { +public: + GzStreamFile(IStream *sock, ssize_t st_size, bool index_save, + const char* uid, const char *_workdir) + : st_size(st_size), fstream(sock), workdir(_workdir){ + + if (uid == nullptr) { + timeval now; + gettimeofday(&now, NULL); + char suffix[32]{}; + sprintf(suffix, ".%lu", now.tv_sec*1000000 + now.tv_usec); + fn_idx = fn_idx + suffix; + fn_buff = fn_buff + suffix; + } else { + fn_idx = fn_idx + uid; + fn_buff = fn_buff + uid; + } + + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + strm.avail_in = 0; + strm.next_in = Z_NULL; + // ret = inflateInit(&strm); + inflateInit2(&strm, 47); + ttin = ttout = strm.avail_out = 0; + init_index_header(this, m_idx_header, GZ_CHUNK_SIZE, GZ_DICT_COMPERSS_ALGO, + GZ_COMPRESS_LEVEL); + m_indexes.clear(); + m_lfs = photon::fs::new_localfs_adaptor(workdir.c_str()); + LOG_INFO("create buffer file(`) and indexfile(`)", fn_buff, fn_idx); + if (index_save) { + m_idx_file = m_lfs->open(fn_idx.c_str(), O_TRUNC | O_CREAT | O_RDWR, 0644); + m_idx_filter = new_index_filter(&m_idx_header, &m_indexes, m_idx_file); + fstream = new_sha256_file((IFile*)fstream, false); + } + + buffer_file = photon::fs::open_localfile_adaptor(fn_buff.c_str(), O_TRUNC | O_CREAT | O_RDWR, 0644); + LOG_INFO("create a GzStreamFile. workdir: `", workdir); + }; + + ~GzStreamFile() { + (void)inflateEnd(&strm); + delete m_idx_file; + delete buffer_file; + delete_index_filter(m_idx_filter); + delete fstream; + for (auto it:m_indexes) { + delete it; + } + m_lfs->unlink(fn_buff.c_str()); + delete m_lfs; + } + + UNIMPLEMENTED_POINTER(photon::fs::IFileSystem* filesystem() override); + + virtual off_t lseek(off_t offset, int whence) override { + if (whence == SEEK_END) { + return st_size - offset; + } + if (whence == SEEK_CUR) { + assert(offset >= 0); + char buf[32768]; + while (offset > 0) { + auto len = offset; + if (len > (off_t)sizeof(buf)) { + len = sizeof(buf); + } + auto readn = this->read(buf, len); + if (readn <= 0){ + LOG_ERRNO_RETURN(EIO, -1, "read buffer error"); + } + offset -= readn; + } + return cur_offset; + } + LOG_ERRNO_RETURN(ESPIPE, -1, "unimplemented in GzStreamFile"); + } + virtual int fstat(struct stat *buf) override { + buf->st_size = st_size; + return 0; + } + + virtual ssize_t read(void *buf, size_t count) override { + size_t n = 0; + LOG_DEBUG("count: `", count); + while (count > 0) { + if (bf_len) { + auto delta = ((size_t)bf_len > count ? count : bf_len); + count -= delta; + auto readn = buffer_file->read(buf, delta); + LOG_DEBUG("copy ` bytes to ` from buffer.", readn, VALUE(buf)); + assert(readn == (ssize_t)delta); + buf = (char *)buf + delta; + bf_len -= delta; + n += delta; + continue; + } + LOG_DEBUG("trucate buffer file."); + buffer_file->ftruncate(0); + buffer_file->lseek(0, SEEK_SET); + strm.avail_in = fstream->read(in, CHUNK); + if (strm.avail_in < 0) { + LOG_ERRNO_RETURN(0, -1, "read buffer from uds failed"); + } + if (strm.avail_in == 0) + break; + if (!check_type) { + if (!((uint8_t)in[0] == 0x1f && (uint8_t)in[1] == 0x8b)){ + LOG_ERRNO_RETURN(EIO, -1, "buffer is not gzip type"); + } + check_type = true; + } + LOG_DEBUG("recv: `", strm.avail_in); + st_size += strm.avail_in; + strm.next_in = in; + int ret = 0; + bf_start = 0; + bf_len = 0; + do { + if (strm.avail_out == 0) { + strm.avail_out = CHUNK; + strm.next_out = out; + } + ttin += strm.avail_in; + auto prev = strm.avail_out; + auto copied = out + CHUNK - prev; + ttout += strm.avail_out; + ret = inflate(&strm, Z_BLOCK); + assert(ret != Z_STREAM_ERROR); /* state not clobbered */ + switch (ret) { + case Z_NEED_DICT: + ret = Z_DATA_ERROR; /* and fall through */ + case Z_DATA_ERROR: + case Z_MEM_ERROR: + (void)inflateEnd(&strm); + LOG_ERRNO_RETURN(0, -1, "zlib error: `", zError(ret)); + } + ttin -= strm.avail_in; + ttout -= strm.avail_out; + have = prev - strm.avail_out; + auto delta = (have > count ? count : have); + assert(n + delta <= 65536); + memcpy((char *)buf, copied, delta); + buf = (char *)buf + delta; + n += delta; + LOG_DEBUG("` bytes copied to `", delta, VALUE(buf)); + if (have > count) { + LOG_DEBUG("` bytes bufferd", have - delta); + buffer_file->write(copied + delta, have - delta); + bf_len += (have - delta); + } + if (ret == Z_STREAM_END) { + m_idx_header.uncompress_file_size = ttout; + // TODO Here generate crc32 for last uncompressed data block + // break; + } else if (create_index_entry(strm, m_idx_filter, ttin, ttout, out) != 0) { + LOG_ERRNO_RETURN(ret, -1, "Failed to add_index_entry"); + } + count -= delta; + } while (strm.avail_in != 0); + buffer_file->lseek(0, SEEK_SET); + } + cur_offset += n; + LOG_DEBUG("current offset: `", cur_offset); + return n; + } + + virtual std::string sha256_checksum() override { + if (sha256sum.empty()) { + sha256sum = ((SHA256File*)fstream)->sha256_checksum(); + } + return sha256sum; + } + + virtual std::string save_index() override { + if (save_index_to_file(m_idx_header, m_indexes, m_idx_file, st_size) != 0){ + LOG_ERRNO_RETURN(0, "", "save index failed"); + } + auto dst_file = this->sha256_checksum() + ".gz_idx"; + LOG_INFO("save index as: `", dst_file); + m_lfs->rename(fn_idx.c_str(), dst_file.c_str()); + return std::string(workdir) + "/" + dst_file; + } + ssize_t st_size = 0; + IFile *m_file = nullptr; + photon::fs::IFileSystem *m_fs = nullptr; + bool check_type = false; + + const static int CHUNK = 32768; + IStream *fstream; + z_stream strm; + Byte in[CHUNK]{}, out[CHUNK]{}; // buffer[10485760]; + /* allocate inflate state */ + size_t have = 0; + off_t ttin, ttout; + off_t bf_start = 0, bf_len = 0; + + off_t cur_offset = 0; + photon::fs::IFileSystem *m_lfs = nullptr; + IFile *buffer_file = nullptr; + IFile *m_idx_file = nullptr; + // const char *FN_BUFF_PREFIX = "/tmp/decompbuffer"; + // const char *FN_IDX_PREFIX = "/tmp/gzidx"; + std::string workdir; + std::string fn_buff = "decomp_buffer"; + std::string fn_idx = "gz_idx"; + std::string sha256sum = ""; + IndexFilterRecorder *m_idx_filter = nullptr; + INDEX m_indexes; + IndexFileHeader m_idx_header; +}; + +photon::fs::IFile *open_gzfile_adaptor(const char *path) { gzFile gzf = gzopen(path, "r"); if (gzf == nullptr) LOG_ERRNO_RETURN(0, nullptr, "failed to open gzip file ", VALUE(path)); return new GzAdaptorFile(gzf); } + +IGzFile *open_gzstream_file(IStream *sock, ssize_t st_size, + bool save_idx, const char *uid, const char *workdir) { + char buffer[1024]{}; + if (workdir == nullptr) { + getcwd(buffer, sizeof(buffer)); + workdir = (char*)buffer; + } + return new GzStreamFile(sock, st_size, true, uid, workdir); +} diff --git a/src/overlaybd/gzip/gz.h b/src/overlaybd/gzip/gz.h index 5496b7f1..5c8bcc1d 100644 --- a/src/overlaybd/gzip/gz.h +++ b/src/overlaybd/gzip/gz.h @@ -18,5 +18,16 @@ #include #include +#include +#include + +class IGzFile : public photon::fs::VirtualReadOnlyFile { +public : + // return full filename of gzip index + virtual std::string save_index() = 0; + virtual std::string sha256_checksum() = 0; +}; photon::fs::IFile* open_gzfile_adaptor(const char *path); +IGzFile* open_gzstream_file(IStream *sock, ssize_t st_size, + bool save_index = true, const char *uid = nullptr, const char *workdir = nullptr); diff --git a/src/overlaybd/stream_convertor/CMakeLists.txt b/src/overlaybd/stream_convertor/CMakeLists.txt new file mode 100644 index 00000000..d0465e51 --- /dev/null +++ b/src/overlaybd/stream_convertor/CMakeLists.txt @@ -0,0 +1,18 @@ +file(GLOB SOURCE_SERV "*.cpp") + +add_executable(overlaybd-streamConv ${SOURCE_SERV}) +target_include_directories(overlaybd-streamConv PUBLIC + ${PHOTON_INCLUDE_DIR} + ${rapidjson_SOURCE_DIR}/include +) +target_link_libraries(overlaybd-streamConv + photon_static + gzip_lib + gzindex_lib + tar_lib + yaml-cpp +) + +# if(BUILD_TESTING) +# add_subdirectory(test) +# endif() diff --git a/src/overlaybd/stream_convertor/config.h b/src/overlaybd/stream_convertor/config.h new file mode 100644 index 00000000..ea83242a --- /dev/null +++ b/src/overlaybd/stream_convertor/config.h @@ -0,0 +1,51 @@ +/* + Copyright The Overlaybd Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +#pragma once + +#include +#include +#include "config_utils.h" + +namespace App { + +struct LogConfigPara : public App::ConfigGroup { + APPCFG_CLASS + APPCFG_PARA(level, uint32_t, 1); + APPCFG_PARA(path, std::string, "/var/log/overlaybd/stream-convertor.log"); + APPCFG_PARA(limitSizeMB, uint32_t, 10); + APPCFG_PARA(rotateNum, int, 3); + APPCFG_PARA(mode, std::string, "stdout"); +}; + +struct GlobalConfigPara : public App::ConfigGroup { + APPCFG_CLASS; + APPCFG_PARA(udsAddr, std::string, ""); + APPCFG_PARA(httpAddr, std::string, "127.0.0.1"); + APPCFG_PARA(httpPort, int, 9101); + APPCFG_PARA(reusePort, bool, true); + + APPCFG_PARA(workDir, std::string, "/tmp/stream_conv"); + //APPCFG_PARA(ServerConfig, ServerConfigPara); + APPCFG_PARA(logConfig, LogConfigPara); +}; + +struct AppConfig : public App::ConfigGroup { + APPCFG_CLASS + + APPCFG_PARA(globalConfig, GlobalConfigPara); +}; + +} // namespace ImageConfigNS diff --git a/src/overlaybd/stream_convertor/config_utils.h b/src/overlaybd/stream_convertor/config_utils.h new file mode 100644 index 00000000..40b87f5e --- /dev/null +++ b/src/overlaybd/stream_convertor/config_utils.h @@ -0,0 +1,84 @@ +#pragma once +#include +#include +#include +#include +#include +#include + +namespace App { + +struct ConfigGroup : public YAML::Node { + ConfigGroup() = default; + ConfigGroup(const YAML::Node &node) : YAML::Node(node){}; + ConfigGroup(const std::string &filename) { + parseYAML(std::move(filename)); + } + + void parseYAML(const std::string &fn) { + Clone(YAML::LoadFile(fn)); + } + + static size_t charfilter(char *dst, const char *src, char extract, size_t maxlen = 256UL) { + size_t i; + for (i = 0; (*src) && i < maxlen; i++, src++) { + while (*src == '-') { + src++; + } + if (!(*src)) + break; + dst[i] = *(src); + } + dst[i] = 0; + return i; + } +}; + +#define APPCFG_PARA(ParaName, ParaType, ...) \ + inline ParaType ParaName() const { \ + return operator[](#ParaName).as(__VA_ARGS__); \ + } + +#define APPCFG_CLASS using ConfigGroup::ConfigGroup; + +// merge two yaml nodes +// generate new node with full data +static YAML::Node mergeConfig(const YAML::Node &lhs, const YAML::Node &rhs) { + // if one of nodes is not type of map + // just return rhs + if (lhs.Type() != YAML::NodeType::Map || rhs.Type() != YAML::NodeType::Map) + return YAML::Clone(rhs); + // both are map, merge two maps + YAML::Node ret = YAML::Clone(lhs); + for (auto &node : rhs) { + auto key = node.first.as(); + if (ret[key].IsDefined()) { + // if key exists in lhs, merge recursivily + ret[key] = mergeConfig(lhs[key], node.second); + } else { + // just add as new key + ret[key] = Clone(node.second); + } + } + return ret; +} + +} // namespace App + +namespace YAML { + +template +struct convert { + template + static Node encode(const T &rhs) { + return rhs; + } + + template + static bool decode(const Node &node, T &rhs) { + rhs = T(node); + return true; + } +}; + +} // namespace YAML \ No newline at end of file diff --git a/src/overlaybd/stream_convertor/stream_conv.cpp b/src/overlaybd/stream_convertor/stream_conv.cpp new file mode 100644 index 00000000..46d6d2b0 --- /dev/null +++ b/src/overlaybd/stream_convertor/stream_conv.cpp @@ -0,0 +1,265 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "config.h" +#include "../gzip/gz.h" +#include "../tar/libtar.h" +#include "../../tools/sha256file.h" +#include "photon/common/alog.h" +#include "photon/net/http/server.h" +#include "yaml-cpp/node/node.h" + + +App::AppConfig gconfig; + +using namespace std; +using namespace photon; +using namespace photon::net; + +class StreamConvertor { +public: + StreamConvertor(App::AppConfig &config) { + workdir = config.globalConfig().workDir(); + serv_addr = config.globalConfig().udsAddr(); + } + + struct Task { + Task() { + auto now = std::chrono::system_clock::now(); + uint64_t us = + std::chrono::duration_cast(now.time_since_epoch()).count(); + task_id = std::to_string(us) + "." + std::to_string(rand() % 1000000); + } + string task_id; + int status; + string msg; + }; + + static int get_task_id(Task &t) { + + return 0; + } + bool valid_request(http::Request &req, http::Response &resp) { + auto body_size = req.headers.content_length(); + LOG_DEBUG("body size: `", body_size); + if (body_size > 0) { + return true; + } + string str = "{\"code\": -1, \"message\": \"invalid body size(0)\"}\n"; + resp.headers.content_length(str.size()); + resp.write((void*)str.data(), str.size()); + return false; + } + + int gen_meta(http::Request &req, http::Response &resp, std::string_view){ + + Task t; + auto uuid = req.headers.get_value("UUID").to_string(); + resp.set_result(200); + if (!valid_request(req, resp)) { + return 0; + } + string msg = string("{\"code\": 0, \"message\":") + uuid + "\"\"}\n"; + if (do_task(&req, t) != 0){ + msg = "failed"; + } + resp.headers.content_length(msg.size()); + resp.write((void*)msg.data(), msg.size()); + return 0; + } + + int do_task(IStream *sock, Task &t) { + + struct stat st; + string filename=""; + SCOPE_AUDIT("gen_meta", AU_FILEOP(filename.c_str(), 0, st.st_size)); + + auto start = std::chrono::steady_clock::now(); + LOG_DEBUG("Accepted"); + auto streamfile = open_gzstream_file(sock, 0, true, t.task_id.c_str(), workdir.c_str()); + DEFER(delete streamfile); + auto turboOCI_stream = new UnTar(streamfile, nullptr, 0, 4096, nullptr, true); + DEFER(delete turboOCI_stream); + auto fn_tar_idx = t.task_id + ".tar.meta"; + auto tar_idx = m_fs->open(fn_tar_idx.c_str(), O_TRUNC | O_CREAT | O_RDWR, 0644); + DEFER(delete tar_idx); + auto nitems = turboOCI_stream->dump_tar_headers(tar_idx); + if (nitems < 0) { + LOG_ERROR_RETURN(0, -1, "invalid buffer received."); + } + LOG_INFO("` items get in `", nitems, fn_tar_idx); + + streamfile->fstat(&st); + auto fn_gz_idx = streamfile->save_index(); + auto dst_tar_idx = streamfile->sha256_checksum() + ".tar.meta"; + if (m_fs->rename(fn_tar_idx.c_str(), dst_tar_idx.c_str()) != 0) { + LOG_ERROR("rename metafile (` --> `) failed.", fn_tar_idx, dst_tar_idx); + } + LOG_INFO("save meta success. {gz_idx: `, tar_meta: `} ", fn_gz_idx, dst_tar_idx); + auto end = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(end - start); + auto decomp_speed = (double)st.st_size / 1000 / elapsed.count(); + LOG_INFO("task ` finish. {time_elapsed: `ms, decode_speed: `MB/s}", + t.task_id, elapsed.count(), decomp_speed); + filename = dst_tar_idx; + + return 0; + } + + + + void serve(photon::net::ISocketStream *sock) { + + Task t; + auto ret = do_task(sock, t); + if (ret != 0) { + sock->close(); + } + } + + int start() { + srand(time(NULL)); + if (::access(workdir.c_str(), 0)) { + if (mkdir(workdir.c_str(), 0755) != 0){ + LOG_ERRNO_RETURN(0, -1, "create workdir failed."); + } + } + + std::string httpAddr = gconfig.globalConfig().httpAddr(); + m_tcp_serv = new_tcp_socket_server(); + m_tcp_serv->timeout(1000UL*1000); + if ( gconfig.globalConfig().reusePort() ){ + m_tcp_serv->setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); + } + if (m_tcp_serv->bind(gconfig.globalConfig().httpPort(), IPAddr(httpAddr.c_str())) != 0) { + LOG_ERRNO_RETURN(0, -1, "Failed to bind to port ", + gconfig.globalConfig().httpPort()); + }; + if (m_tcp_serv->listen() != 0) { + LOG_ERRNO_RETURN(0, -1, "Failed to listen socket"); + } + + m_fs = photon::fs::new_localfs_adaptor(workdir.c_str()); + auto uds_handler = [&](photon::net::ISocketStream *s) -> int { + LOG_INFO("Accept UDS"); + this->serve(s); + return 0; + }; + m_http_serv = http::new_http_server(); + m_http_serv->add_handler({this, &StreamConvertor::gen_meta}, "/generateMeta"); + m_tcp_serv->set_handler(m_http_serv->get_connection_handler()); + + if (not gconfig.globalConfig().udsAddr().empty()){ + m_uds_serv = photon::net::new_uds_server(true); + LOG_INFO("try to bind: `", serv_addr); + if (m_uds_serv->bind(serv_addr.c_str())|| m_uds_serv->listen(100)) { + LOG_ERRNO_RETURN(0, -1, "bind sock addr failed."); + } + char path[256]{}; + if ((m_uds_serv->getsockname(path, 256) < 0) || (strcmp(path, serv_addr.c_str()) != 0)) { + LOG_ERRNO_RETURN(0, -1, "get socket name error. ['`' != '`'(expected)]", path, + serv_addr); + } + LOG_INFO("uds server listening `", path); + m_uds_serv->set_handler(uds_handler); + m_uds_serv->start_loop(); + } + m_tcp_serv->start_loop(true); + + return 0; + } + + int stop() { + delete m_fs; + if (m_uds_serv) { + m_uds_serv->terminate(); + delete m_uds_serv; + } + if (m_tcp_serv) { + m_tcp_serv->terminate(); + + delete m_tcp_serv; + delete m_http_serv; + } + return 0; + } + + photon::fs::IFileSystem *m_fs = nullptr; + photon::net::ISocketServer *m_uds_serv = nullptr, *m_tcp_serv = nullptr; + photon::net::http::HTTPServer *m_http_serv = nullptr; + + std::string serv_addr; + std::string workdir; +} * server; + +static void stop_by_signal(int signal) { + LOG_INFO("Got signal ", signal); + server->stop(); + LOG_INFO("server stopped"); +} + + +int set_log_config(){ + set_log_output_level(gconfig.globalConfig().logConfig().level()); + auto config = gconfig.globalConfig().logConfig(); + if (config.mode() == "file") { + LOG_INFO("redirect log into `, limitSize: `MB, rotateNums: `", + config.path(), config.limitSizeMB(), config.rotateNum()); + auto log_fn = config.path(); + default_logger.log_output = new_log_output_file(log_fn.c_str(), + config.limitSizeMB(), + config.rotateNum() + ); + if (!default_logger.log_output) { + default_logger.log_output = log_output_stdout; + } + } + + default_audit_logger.log_output = log_output_stdout; + return 0; +} + +int main(int argc, char *argv[]) { + mallopt(M_TRIM_THRESHOLD, 128 * 1024); + // prctl(PR_SET_THP_DISABLE, 1); + + // set_log_output_level(1); + photon::init(photon::INIT_EVENT_DEFAULT | photon::INIT_IO_DEFAULT | photon::INIT_EVENT_SIGNAL); + DEFER(photon::fini()); + DEFER(default_logger.log_output = log_output_null;); + + photon::block_all_signal(); + photon::sync_signal(SIGTERM, &stop_by_signal); + photon::sync_signal(SIGINT, &stop_by_signal); + photon::sync_signal(SIGTSTP, &stop_by_signal); + + auto cfg_path = argv[1]; + LOG_INFO("parsing config: `", cfg_path); + auto node = YAML::LoadFile(cfg_path); + gconfig = App::mergeConfig(gconfig, node); + if (gconfig.IsNull()) { + LOG_ERRNO_RETURN(0, -1, "parse config file failed."); + } + set_log_config(); + LOG_INFO("start server..."); + // photon::sync_signal(SIGPIPE, &ignore_signal); + // photon::sync_signal(SIGUSR2, &restart_by_signal); + server = new StreamConvertor(gconfig); + DEFER(delete server); + server->start(); + return 0; +} diff --git a/src/overlaybd/tar/libtar.cpp b/src/overlaybd/tar/libtar.cpp index 866391bb..9e06bc71 100644 --- a/src/overlaybd/tar/libtar.cpp +++ b/src/overlaybd/tar/libtar.cpp @@ -75,12 +75,18 @@ int UnTar::set_file_perms(const char *filename) { ssize_t UnTar::dump_tar_headers(photon::fs::IFile *as) { ssize_t count = 0; - while (read_header(as) == 0) { + while (true) { + auto next = read_header(as); + if (next == -1) { + return -1; + } else if (next == 1) { + break; + } + count++; if (TH_ISREG(header)) { auto size = get_size(); file->lseek(((size + T_BLOCKSIZE - 1) / T_BLOCKSIZE) * T_BLOCKSIZE, SEEK_CUR); // skip size } - count++; } return count; } diff --git a/src/overlaybd/tar/libtar.h b/src/overlaybd/tar/libtar.h index d373e706..4222d6ab 100644 --- a/src/overlaybd/tar/libtar.h +++ b/src/overlaybd/tar/libtar.h @@ -189,13 +189,14 @@ class UnTar : public TarCore { fs_base_file(bf), meta_only(meta_only), from_tar_idx(from_tar_idx){} int extract_all(); + // return number of objects in this tarfile ssize_t dump_tar_headers(photon::fs::IFile* file); private: photon::fs::IFileSystem *fs = nullptr; // target photon::fs::IFile *fs_base_file = nullptr; - bool meta_only; - bool from_tar_idx; + bool meta_only = false; + bool from_tar_idx = false; std::set unpackedPaths; std::list> dirs; // diff --git a/src/overlaybd/tar/test/CMakeLists.txt b/src/overlaybd/tar/test/CMakeLists.txt index 4740a9b8..b96ac109 100644 --- a/src/overlaybd/tar/test/CMakeLists.txt +++ b/src/overlaybd/tar/test/CMakeLists.txt @@ -7,7 +7,7 @@ link_directories($ENV{GTEST}/lib) add_executable(untar_test test.cpp) target_include_directories(untar_test PUBLIC ${PHOTON_INCLUDE_DIR}) target_link_libraries(untar_test gtest gtest_main pthread photon_static - tar_lib extfs_lib lsmt_lib) + tar_lib extfs_lib lsmt_lib gzip_lib gzindex_lib checksum_lib) add_test( NAME untar_test diff --git a/src/overlaybd/tar/test/test.cpp b/src/overlaybd/tar/test/test.cpp index 8a79e9c0..a9d85388 100644 --- a/src/overlaybd/tar/test/test.cpp +++ b/src/overlaybd/tar/test/test.cpp @@ -14,18 +14,21 @@ limitations under the License. */ -#include #include #include #include #include #include #include +#include #include +#include "../../gzindex/gzfile.h" #include "../../extfs/extfs.h" #include "../../lsmt/file.h" #include "../libtar.h" #include "../tar_file.cpp" +#include "../../gzip/gz.h" +#include "../../../tools/sha256file.h" #define FILE_SIZE (2 * 1024 * 1024) @@ -36,12 +39,12 @@ class TarTest : public ::testing::Test { fs = photon::fs::new_localfs_adaptor(); ASSERT_NE(nullptr, fs); - if (fs->access(base.c_str(), 0) != 0) { - auto ret = fs->mkdir(base.c_str(), 0755); + if (fs->access(workdir.c_str(), 0) != 0) { + auto ret = fs->mkdir(workdir.c_str(), 0755); ASSERT_EQ(0, ret); } - fs = photon::fs::new_subfs(fs, base.c_str(), true); + fs = photon::fs::new_subfs(fs, workdir.c_str(), true); ASSERT_NE(nullptr, fs); } virtual void TearDown() override{ @@ -52,10 +55,26 @@ class TarTest : public ::testing::Test { delete fs; } - int download(const std::string &url) { + int download(const std::string &url, std::string out = "") { + if (out == "") { + out = workdir + "/" + std::string(basename(url.c_str())); + } + if (fs->access(out.c_str(), 0) == 0) + return 0; + // download file + std::string cmd = "curl -s -o " + out + " " + url; + LOG_INFO(VALUE(cmd.c_str())); + auto ret = system(cmd.c_str()); + if (ret != 0) { + LOG_ERRNO_RETURN(0, -1, "download failed: `", url.c_str()); + } + return 0; + } + + int download_decomp(const std::string &url) { // download file std::string cmd = "wget -q -O - " + url +" | gzip -d -c >" + - base + "/latest.tar"; + workdir + "/latest.tar"; LOG_INFO(VALUE(cmd.c_str())); auto ret = system(cmd.c_str()); if (ret != 0) { @@ -113,8 +132,9 @@ class TarTest : public ::testing::Test { ssize_t LEN = 1UL<<20; char vbuf[1UL<<20], tbuf[1UL<<20]; + // set_log_output_level(0); for (off_t i = 0; i < count; i+=LEN) { - // LOG_INFO("`", i); + LOG_DEBUG("`", i); auto ret_v = verify->pread(vbuf, LEN, i); auto ret_t = test->pread(tbuf, LEN, i); if (ret_v == -1 || ret_t == -1) { @@ -132,14 +152,14 @@ class TarTest : public ::testing::Test { return 0; } - std::string base = "/tmp/tar_test"; + std::string workdir = "/tmp/tar_test"; photon::fs::IFileSystem *fs; std::vector filelist; }; // photon::fs::IFileSystem *TarTest::fs = nullptr; TEST_F(TarTest, untar) { - ASSERT_EQ(0, download("https://github.com/containerd/overlaybd/archive/refs/tags/latest.tar.gz")); + ASSERT_EQ(0, download_decomp("https://github.com/containerd/overlaybd/archive/refs/tags/latest.tar.gz")); auto tarf = fs->open("latest.tar", O_RDONLY, 0666); ASSERT_NE(nullptr, tarf); DEFER(delete tarf); @@ -157,8 +177,7 @@ TEST_F(TarTest, untar) { TEST_F(TarTest, tar_meta) { // set_log_output_level(0); - ASSERT_EQ(0, download("https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz")); - // ASSERT_EQ(0, download("https://github.com/containerd/overlaybd/archive/refs/tags/latest.tar.gz")); + ASSERT_EQ(0, download_decomp("https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz")); auto src_file = fs->open("latest.tar", O_RDONLY, 0666); ASSERT_NE(nullptr, src_file); @@ -178,7 +197,7 @@ TEST_F(TarTest, tar_meta) { auto tar_idx = fs->open("latest.tar.meta", O_TRUNC | O_CREAT | O_RDWR, 0644); auto imgfile = createDevice("mock", src_file); DEFER(delete imgfile;); - auto tar = new UnTar(src_file, nullptr, 0, 4096, nullptr, false); + auto tar = new UnTar(src_file, nullptr, 0, 4096, nullptr, true); auto obj_count = tar->dump_tar_headers(tar_idx); EXPECT_NE(-1, obj_count); LOG_INFO("objects count: `", obj_count); @@ -197,7 +216,118 @@ TEST_F(TarTest, tar_meta) { delete tar_idx; delete tar; - // EXPECT_EQ(0, ret); +} + +TEST_F(TarTest, stream) { + set_log_output_level(1); + std::string fn_test_tgz = "go1.17.6.linux-amd64.tar.gz"; + ASSERT_EQ( + 0, download("https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz", + "")); + set_log_output_level(0); + + for (int i = 0; i < 3; i++) { + auto src_file = fs->open(fn_test_tgz.c_str(), O_RDONLY, 0644); + struct stat st; + src_file->fstat(&st); + auto streamfile = open_gzstream_file(src_file, 0); + auto fn = ("/tmp/tar_test/" + fn_test_tgz); + ASSERT_NE(nullptr, src_file); + DEFER(delete src_file); + + auto turboOCI_stream = new UnTar(streamfile, nullptr, 0, 4096, nullptr, true); + DEFER(delete turboOCI_stream); + + auto tar_idx = fs->open("stream.tar.meta", O_TRUNC | O_CREAT | O_RDWR, 0644); + DEFER(delete tar_idx); + auto obj_count = turboOCI_stream->dump_tar_headers(tar_idx); + EXPECT_NE(-1, obj_count); + tar_idx->lseek(0, SEEK_SET); + auto tar_meta_sha256 = new_sha256_file(tar_idx, false); + DEFER(delete tar_meta_sha256); + ASSERT_STREQ(tar_meta_sha256->sha256_checksum().c_str(), "sha256:c5aaa64a1b70964758e190b88b3e65528607b0002bffe42513bc65ac6e65f337"); + auto idx_fn = streamfile->save_index(); + // auto idx_fn = "/tmp/test.idx"; + + // create_gz_index(src_file, idx_fn); + auto idx_sha256 = sha256sum(idx_fn.c_str()); + delete streamfile; + ASSERT_STREQ(idx_sha256.c_str(), "sha256:af3ffd4965d83f3d235c48ce75e16a1f2edf12d0e5d82816d7066a8485aade82"); + } +} + +TEST_F(TarTest, gz_tarmeta_e2e) { + // set_log_output_level(0); + std::vector filelist { + "https://dadi-shared.oss-cn-beijing.aliyuncs.com/cri-containerd-cni-1.5.2-linux-amd64.tar.gz", + "https://dadi-shared.oss-cn-beijing.aliyuncs.com/containerd-1.4.4-linux-amd64.tar.gz", + "https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz" + }; + for (auto file : filelist){ + ASSERT_EQ(0, download(file.c_str())); + auto fn = std::string(basename(file.c_str())); + auto gzip_file = fs->open(fn.c_str(), O_RDONLY, 0600); + auto gzfile = open_gzfile_adaptor((workdir + "/" + fn).c_str()); + auto fn_idx = (workdir + "/" + fn + ".gz_idx"); + ASSERT_EQ(create_gz_index(gzip_file, fn_idx.c_str()), 0); + auto gz_idx = fs->open((fn + ".gz_idx").c_str(), O_RDONLY, 0644); + gzip_file->lseek(0, SEEK_SET); + auto src_file = new_gzfile(gzip_file, gz_idx, true); + ASSERT_NE(nullptr, src_file); + auto verify_dev = createDevice((fn + ".verify").c_str(), src_file); + make_extfs(verify_dev); + auto verify_ext4fs = new_extfs(verify_dev, false); + auto verifyfs = new_subfs(verify_ext4fs, "/", true); + // gzfile->lseek(0, SEEK_SET); + auto turboOCI_verify = new UnTar(gzfile, verifyfs, 0, 4096, verify_dev, true); + ASSERT_EQ(0, turboOCI_verify->extract_all()); + verify_ext4fs->sync(); + + // src_file->lseek(0, 0); + auto tar_idx = fs->open((fn + ".tar.meta").c_str(), O_TRUNC | O_CREAT | O_RDWR, 0644); + auto stream_src = fs->open(fn.c_str(), O_RDONLY, 0600); + auto streamfile = open_gzstream_file(stream_src, 0); + auto tar = new UnTar(streamfile, nullptr, 0, 4096, nullptr, true); + auto obj_count = tar->dump_tar_headers(tar_idx); + EXPECT_NE(-1, obj_count); + LOG_INFO("objects count: `", obj_count); + + auto fn_test_idx = streamfile->save_index(); + LOG_INFO("gzip index of [`]: `", fn, fn_test_idx); + auto test_gz_idx = open_localfile_adaptor(fn_test_idx.c_str(), O_RDONLY); + ASSERT_NE(test_gz_idx, nullptr); + auto test_gzfile = fs->open(fn.c_str(), O_RDONLY, 0600); + ASSERT_NE(test_gzfile, nullptr); + auto gz_target = new_gzfile(test_gzfile,test_gz_idx, true); + auto imgfile = createDevice((fn + ".mock").c_str(), gz_target); + + tar_idx->lseek(0,0); + + make_extfs(imgfile); + auto extfs = new_extfs(imgfile, false); + auto target = new_subfs(extfs, "/", true); + auto turboOCI_mock = new UnTar(tar_idx, target, TAR_IGNORE_CRC, 4096, imgfile, true, true); + auto ret = turboOCI_mock->extract_all(); + extfs->sync(); + + ASSERT_EQ(0, ret); + EXPECT_EQ(0, do_verify(verify_dev, imgfile)); + + delete turboOCI_mock; + delete target; + delete src_file; + delete gzfile; + delete turboOCI_verify; + delete verifyfs; + delete tar_idx; + delete stream_src; + delete streamfile; + delete tar; + + delete verify_dev; + delete imgfile; + } + } TEST_F(TarTest, tar_header_check) { diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 1a795311..68d56aaa 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -13,7 +13,7 @@ target_link_libraries(overlaybd-zfile photon_static overlaybd_lib) add_executable(overlaybd-apply overlaybd-apply.cpp comm_func.cpp) target_include_directories(overlaybd-apply PUBLIC ${PHOTON_INCLUDE_DIR} ${rapidjson_SOURCE_DIR}/include) -target_link_libraries(overlaybd-apply photon_static overlaybd_lib overlaybd_image_lib) +target_link_libraries(overlaybd-apply photon_static overlaybd_lib overlaybd_image_lib checksum_lib) set_target_properties(overlaybd-apply PROPERTIES INSTALL_RPATH "/opt/overlaybd/lib") add_executable(turboOCI-apply turboOCI-apply.cpp comm_func.cpp) @@ -21,6 +21,9 @@ target_include_directories(turboOCI-apply PUBLIC ${PHOTON_INCLUDE_DIR} ${rapidjs target_link_libraries(turboOCI-apply photon_static overlaybd_lib overlaybd_image_lib) set_target_properties(turboOCI-apply PROPERTIES INSTALL_RPATH "/opt/overlaybd/lib") +add_library(checksum_lib sha256file.cpp) +target_include_directories(checksum_lib PUBLIC ${PHOTON_INCLUDE_DIR}) +target_link_libraries(checksum_lib photon_static) install(TARGETS overlaybd-commit diff --git a/src/tools/comm_func.cpp b/src/tools/comm_func.cpp index 28010a54..8be821c7 100644 --- a/src/tools/comm_func.cpp +++ b/src/tools/comm_func.cpp @@ -18,7 +18,7 @@ #include "../overlaybd/tar/tar_file.h" #include #include - +#include #include "../overlaybd/extfs/extfs.h" #include "../image_service.h" #include "../image_file.h" diff --git a/src/tools/comm_func.h b/src/tools/comm_func.h index 0db7c70f..d013717c 100644 --- a/src/tools/comm_func.h +++ b/src/tools/comm_func.h @@ -30,11 +30,9 @@ #include "../image_service.h" #include "../image_file.h" #include -#include #include #include #include "CLI11.hpp" -#include int generate_option(CLI::App &app); @@ -45,53 +43,3 @@ int create_overlaybd(const std::string &srv_config, const std::string &dev_confi photon::fs::IFileSystem *create_ext4fs(photon::fs::IFile *imgfile, bool mkfs, bool enable_buffer, const char* root); - -class SHA256CheckedFile: public VirtualReadOnlyFile { -public: - IFile *m_file; - SHA256_CTX ctx = {0}; - size_t total_read = 0; - - SHA256CheckedFile(IFile *file): m_file(file) { - SHA256_Init(&ctx); - } - ~SHA256CheckedFile() { - delete m_file; - } - virtual IFileSystem *filesystem() override { - return nullptr; - } - ssize_t read(void *buf, size_t count) override { - auto rc = m_file->read(buf, count); - if (rc > 0 && SHA256_Update(&ctx, buf, rc) < 0) { - LOG_ERROR("sha256 calculate error"); - return -1; - } - return rc; - } - off_t lseek(off_t offset, int whence) override { - return m_file->lseek(offset, whence); - } - std::string sha256_checksum() { - // read trailing data - char buf[64*1024]; - auto rc = m_file->read(buf, 64*1024); - if (rc == 64*1024) { - LOG_WARN("too much trailing data"); - } - if (rc > 0 && SHA256_Update(&ctx, buf, rc) < 0) { - LOG_ERROR("sha256 calculate error"); - return ""; - } - // calc sha256 result - unsigned char sha[32]; - SHA256_Final(sha, &ctx); - char res[SHA256_DIGEST_LENGTH * 2]; - for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) - sprintf(res + (i * 2), "%02x", sha[i]); - return "sha256:" + std::string(res, SHA256_DIGEST_LENGTH * 2); - } - int fstat(struct stat *buf) override { - return m_file->fstat(buf); - } -}; diff --git a/src/tools/overlaybd-apply.cpp b/src/tools/overlaybd-apply.cpp index ed2c6bc9..54fe5a34 100644 --- a/src/tools/overlaybd-apply.cpp +++ b/src/tools/overlaybd-apply.cpp @@ -39,6 +39,7 @@ #include "../image_file.h" #include "CLI11.hpp" #include "comm_func.h" +#include "sha256file.h" using namespace std; using namespace photon::fs; @@ -87,7 +88,7 @@ int main(int argc, char **argv) { DEFER({ delete target; }); photon::fs::IFile* src_file = nullptr; - SHA256CheckedFile* checksum_file = nullptr; + SHA256File* checksum_file = nullptr; auto tarf = open_file(input_path.c_str(), O_RDONLY, 0666); DEFER(delete tarf); @@ -104,7 +105,7 @@ int main(int argc, char **argv) { } if (!sha256_checksum.empty()) { - src_file = checksum_file = new SHA256CheckedFile(src_file); + src_file = checksum_file = new_sha256_file(src_file, true); } photon::fs::IFile* base_file = raw ? nullptr : ((ImageFile *)imgfile)->get_base(); diff --git a/src/tools/sha256file.cpp b/src/tools/sha256file.cpp new file mode 100644 index 00000000..2878db8e --- /dev/null +++ b/src/tools/sha256file.cpp @@ -0,0 +1,110 @@ +#include +#include +#include "sha256file.h" +#include +#include +#include +#include +#include + +using namespace photon::fs; +using namespace std; + +class SHA256CheckedFile: public SHA256File { +public: + IFile *m_file; + SHA256_CTX ctx = {0}; + size_t total_read = 0; + bool m_ownership = false; + + SHA256CheckedFile(IFile *file, bool ownership): m_file(file), m_ownership(ownership) { + SHA256_Init(&ctx); + } + ~SHA256CheckedFile() { + if (m_ownership) delete m_file; + } + virtual IFileSystem *filesystem() override { + return nullptr; + } + ssize_t read(void *buf, size_t count) override { + auto rc = m_file->read(buf, count); + if (rc > 0 && SHA256_Update(&ctx, buf, rc) < 0) { + LOG_ERROR("sha256 calculate error"); + return -1; + } + return rc; + } + off_t lseek(off_t offset, int whence) override { + return m_file->lseek(offset, whence); + } + virtual std::string sha256_checksum() override{ + // read trailing data + char buf[64*1024]; + auto rc = m_file->read(buf, 64*1024); + while (rc > 0) { + // if (rc == 64*1024) { + // LOG_WARN("too much trailing data"); + // } + if (rc > 0 && SHA256_Update(&ctx, buf, rc) < 0) { + LOG_ERROR("sha256 calculate error"); + return ""; + } + rc = m_file->read(buf, 64*1024); + } + // calc sha256 result + unsigned char sha[32]; + SHA256_Final(sha, &ctx); + char res[SHA256_DIGEST_LENGTH * 2]; + for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) + sprintf(res + (i * 2), "%02x", sha[i]); + return "sha256:" + std::string(res, SHA256_DIGEST_LENGTH * 2); + } + int fstat(struct stat *buf) override { + return m_file->fstat(buf); + } +}; + +SHA256File *new_sha256_file(IFile *file, bool ownership = true) { + return new SHA256CheckedFile(file, ownership); +} + +string sha256sum(const char *fn) { + constexpr size_t BUFFERSIZE = 65536; + // auto file = open_localfile_adaptor(fn, O_RDONLY | O_DIRECT); + // auto sha256file = new_sha256_file(file, true); + // DEFER(delete sha256file); + // return sha256file->sha256_checksum(); + int fd = open(fn, O_RDONLY | O_DIRECT); + if (fd < 0) { + LOG_ERROR("failed to open `", fn); + return ""; + } + DEFER(close(fd);); + + struct stat stat; + if (::fstat(fd, &stat) < 0) { + LOG_ERROR("failed to stat `", fn); + return ""; + } + SHA256_CTX ctx = {0}; + SHA256_Init(&ctx); + __attribute__((aligned(ALIGNMENT_4K))) char buffer[65536]; + unsigned char sha[32]; + ssize_t recv = 0; + for (off_t offset = 0; offset < stat.st_size; offset += BUFFERSIZE) { + recv = pread(fd, &buffer, BUFFERSIZE, offset); + if (recv < 0) { + LOG_ERROR("io error: `", fn); + return ""; + } + if (SHA256_Update(&ctx, buffer, recv) < 0) { + LOG_ERROR("sha256 calculate error: `", fn); + return ""; + } + } + SHA256_Final(sha, &ctx); + char res[SHA256_DIGEST_LENGTH * 2]; + for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) + sprintf(res + (i * 2), "%02x", sha[i]); + return "sha256:" + std::string(res, SHA256_DIGEST_LENGTH * 2); +} diff --git a/src/tools/sha256file.h b/src/tools/sha256file.h new file mode 100644 index 00000000..cd8bf2fd --- /dev/null +++ b/src/tools/sha256file.h @@ -0,0 +1,14 @@ + +#include +#include +#include +#include + +class SHA256File : public photon::fs::VirtualReadOnlyFile { +public: + virtual std::string sha256_checksum() = 0; +}; + +SHA256File *new_sha256_file(photon::fs::IFile *file, bool ownership); + +std::string sha256sum(const char *fn); diff --git a/src/tools/turboOCI-apply.cpp b/src/tools/turboOCI-apply.cpp index 39c90836..41c82b42 100644 --- a/src/tools/turboOCI-apply.cpp +++ b/src/tools/turboOCI-apply.cpp @@ -38,7 +38,6 @@ #include "../image_service.h" #include "../image_file.h" #include "CLI11.hpp" -#include #include "comm_func.h" using namespace std;