diff --git a/CMakeLists.txt b/CMakeLists.txt index 798f86ea..161af39f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,6 +42,16 @@ set(ENABLE_MIMIC_VDSO off) find_package(photon REQUIRED) find_package(tcmu REQUIRED) +find_package(yamlcpp) +if (NOT yamlcpp_FOUND) + FetchContent_Declare( + yamlcpp + GIT_REPOSITORY https://github.com/jbeder/yaml-cpp.git + GIT_TAG 0.8.0 + ) + FetchContent_MakeAvailable(yamlcpp) +endif() + if(BUILD_TESTING) enable_testing() diff --git a/src/example_config/stream-conv.yaml b/src/example_config/stream-conv.yaml new file mode 100644 index 00000000..f56c147b --- /dev/null +++ b/src/example_config/stream-conv.yaml @@ -0,0 +1,19 @@ +# // { +# // "serverConfig": { +# // "uds": "/var/run/stream_conv.sock", +# // "workdir": "/tmp/stream_conv" +# // }, +# // "logConfig": { +# // "level": 0, +# // "mode": "stdout" +# // } +# // } +globalConfig: + servAddr: /var/run/stream_conv.sock + workDir: /tmp/stream_conv + logConfig: + level: 0 + mode: stdout + rotateNum: 3 + limitSizeMB: 10 + path: /var/log/overlaybd/stream_convertor.log \ No newline at end of file diff --git a/src/overlaybd/gzindex/gzfile.cpp b/src/overlaybd/gzindex/gzfile.cpp index 14a1f893..55af02da 100644 --- a/src/overlaybd/gzindex/gzfile.cpp +++ b/src/overlaybd/gzindex/gzfile.cpp @@ -355,7 +355,6 @@ ssize_t GzFile::pread(void *buf, size_t count, off_t offset) { if (p == nullptr) { LOG_ERRNO_RETURN(0, -1, "Failed to seek_index(,`)", offset); } - //LOG_DEBUG("offset:`, index->de_pos:", offset, p->de_pos+0); return extract(p, offset, (unsigned char*)buf, count); } diff --git a/src/overlaybd/gzindex/test/test.cpp b/src/overlaybd/gzindex/test/test.cpp index 6d598c15..225cba0d 100644 --- a/src/overlaybd/gzindex/test/test.cpp +++ b/src/overlaybd/gzindex/test/test.cpp @@ -206,7 +206,7 @@ int download(const std::string &url, const std::string &out) { // return 0; auto base = std::string(basename(url.c_str())); // download file - std::string cmd = "curl -s -o " + out + " " + url; + std::string cmd = "curl -sL -o " + out + " " + url; LOG_INFO(VALUE(cmd.c_str())); auto ret = system(cmd.c_str()); if (ret != 0) { @@ -296,15 +296,19 @@ TEST_F(GzIndexTest, stream) { auto lfs = photon::fs::new_localfs_adaptor(workdir.c_str()); LOG_INFO("start streamFile test"); std::vector filelist = { - "https://dadi-shared.oss-cn-beijing.aliyuncs.com/cri-containerd-cni-1.5.2-linux-amd64.tar.gz", - "https://dadi-shared.oss-cn-beijing.aliyuncs.com/containerd-1.4.4-linux-amd64.tar.gz", - "https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.13.linux-amd64.tar.gz", - "https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz" + // "https://dadi-shared.oss-cn-beijing.aliyuncs.com/cri-containerd-cni-1.5.2-linux-amd64.tar.gz", + "https://github.com/containerd/containerd/releases/download/v1.5.17/cri-containerd-cni-1.5.17-linux-amd64.tar.gz", + "https://github.com/containerd/containerd/releases/download/v1.4.4/containerd-1.4.4-linux-amd64.tar.gz", + // "https://dadi-shared.oss-cn-beijing.aliyuncs.com/containerd-1.4.4-linux-amd64.tar.gz", + // "https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.13.linux-amd64.tar.gz", + "https://go.dev/dl/go1.17.6.linux-amd64.tar.gz", + // "https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz" }; std::vector tar_sha256sum = { - "sha256:05e8b01c1ddb6ba4f8c84e7dbc76529bdc09861f9ce17c213a49e8c334f184ed", + "sha256:02adc5074f59777d2ca74c8a0291659f69291865184c987d9c10e58f58b162c2", + // "sha256:05e8b01c1ddb6ba4f8c84e7dbc76529bdc09861f9ce17c213a49e8c334f184ed", "sha256:0ccf983abf0b0fb64cc969079982bc34761ce22d7a3236a40d49d840d150e09a", - "sha256:1041ec4e2f40156e0731be175388be4c67aeceb44829f988df213e9fd5f26dc9", + // "sha256:1041ec4e2f40156e0731be175388be4c67aeceb44829f988df213e9fd5f26dc9", "sha256:562688d70dcd1596556e7c671c1266f6e9c22b4f4fb8344efa8bed88fc2bac7b" }; int i = 0; diff --git a/src/overlaybd/gzip/gz.cpp b/src/overlaybd/gzip/gz.cpp index 18bf15a5..401c65ab 100644 --- a/src/overlaybd/gzip/gz.cpp +++ b/src/overlaybd/gzip/gz.cpp @@ -26,6 +26,7 @@ #include #include "../../tools/sha256file.h" #include "../gzindex/gzfile_index.h" +#include "photon/common/alog.h" class GzAdaptorFile : public photon::fs::VirtualReadOnlyFile { public: GzAdaptorFile() { @@ -133,6 +134,9 @@ class GzStreamFile : public IGzFile { len = sizeof(buf); } auto readn = this->read(buf, len); + if (readn <= 0){ + LOG_ERRNO_RETURN(EIO, -1, "read buffer error"); + } offset -= readn; } return cur_offset; @@ -166,9 +170,14 @@ class GzStreamFile : public IGzFile { if (strm.avail_in < 0) { LOG_ERRNO_RETURN(0, -1, "read buffer from uds failed"); } - size_t readn = strm.avail_in; if (strm.avail_in == 0) break; + if (!check_type) { + if (!((uint8_t)in[0] == 0x1f && (uint8_t)in[1] == 0x8b)){ + LOG_ERRNO_RETURN(EIO, -1, "buffer is not gzip type"); + } + check_type = true; + } LOG_DEBUG("recv: `", strm.avail_in); st_size += strm.avail_in; strm.next_in = in; @@ -243,6 +252,7 @@ class GzStreamFile : public IGzFile { ssize_t st_size = 0; IFile *m_file = nullptr; photon::fs::IFileSystem *m_fs = nullptr; + bool check_type = false; const static int CHUNK = 32768; IStream *fstream; diff --git a/src/overlaybd/stream_convertor/CMakeLists.txt b/src/overlaybd/stream_convertor/CMakeLists.txt index 17a7707a..d0465e51 100644 --- a/src/overlaybd/stream_convertor/CMakeLists.txt +++ b/src/overlaybd/stream_convertor/CMakeLists.txt @@ -1,14 +1,16 @@ file(GLOB SOURCE_SERV "*.cpp") -add_executable(stream_conv ${SOURCE_SERV}) -target_include_directories(stream_conv PUBLIC +add_executable(overlaybd-streamConv ${SOURCE_SERV}) +target_include_directories(overlaybd-streamConv PUBLIC ${PHOTON_INCLUDE_DIR} + ${rapidjson_SOURCE_DIR}/include ) -target_link_libraries(stream_conv -photon_static -gzip_lib -gzindex_lib -tar_lib +target_link_libraries(overlaybd-streamConv + photon_static + gzip_lib + gzindex_lib + tar_lib + yaml-cpp ) # if(BUILD_TESTING) diff --git a/src/overlaybd/stream_convertor/config.h b/src/overlaybd/stream_convertor/config.h new file mode 100644 index 00000000..c38216b0 --- /dev/null +++ b/src/overlaybd/stream_convertor/config.h @@ -0,0 +1,52 @@ +/* + Copyright The Overlaybd Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +#pragma once + +#include +#include +#include "config_utils.h" + +namespace App { + +struct LogConfigPara : public App::ConfigGroup { + APPCFG_CLASS + APPCFG_PARA(level, uint32_t, 1); + APPCFG_PARA(path, std::string, "/var/log/overlaybd/stream-convertor.log"); + APPCFG_PARA(limitSizeMB, uint32_t, 10); + APPCFG_PARA(rotateNum, int, 3); + APPCFG_PARA(mode, std::string, "stdout"); +}; + +// struct ServerConfigPara : public App::ConfigGroup { +// APPCFG_CLASS + +// }; + +struct GlobalConfigPara : public App::ConfigGroup { + APPCFG_CLASS; + APPCFG_PARA(servAddr, std::string, "/var/run/stream_conv.sock"); + APPCFG_PARA(workDir, std::string, "/tmp/stream_conv"); + //APPCFG_PARA(ServerConfig, ServerConfigPara); + APPCFG_PARA(logConfig, LogConfigPara); +}; + +struct AppConfig : public App::ConfigGroup { + APPCFG_CLASS + + APPCFG_PARA(globalConfig, GlobalConfigPara); +}; + +} // namespace ImageConfigNS diff --git a/src/overlaybd/stream_convertor/config_utils.h b/src/overlaybd/stream_convertor/config_utils.h new file mode 100644 index 00000000..40b87f5e --- /dev/null +++ b/src/overlaybd/stream_convertor/config_utils.h @@ -0,0 +1,84 @@ +#pragma once +#include +#include +#include +#include +#include +#include + +namespace App { + +struct ConfigGroup : public YAML::Node { + ConfigGroup() = default; + ConfigGroup(const YAML::Node &node) : YAML::Node(node){}; + ConfigGroup(const std::string &filename) { + parseYAML(std::move(filename)); + } + + void parseYAML(const std::string &fn) { + Clone(YAML::LoadFile(fn)); + } + + static size_t charfilter(char *dst, const char *src, char extract, size_t maxlen = 256UL) { + size_t i; + for (i = 0; (*src) && i < maxlen; i++, src++) { + while (*src == '-') { + src++; + } + if (!(*src)) + break; + dst[i] = *(src); + } + dst[i] = 0; + return i; + } +}; + +#define APPCFG_PARA(ParaName, ParaType, ...) \ + inline ParaType ParaName() const { \ + return operator[](#ParaName).as(__VA_ARGS__); \ + } + +#define APPCFG_CLASS using ConfigGroup::ConfigGroup; + +// merge two yaml nodes +// generate new node with full data +static YAML::Node mergeConfig(const YAML::Node &lhs, const YAML::Node &rhs) { + // if one of nodes is not type of map + // just return rhs + if (lhs.Type() != YAML::NodeType::Map || rhs.Type() != YAML::NodeType::Map) + return YAML::Clone(rhs); + // both are map, merge two maps + YAML::Node ret = YAML::Clone(lhs); + for (auto &node : rhs) { + auto key = node.first.as(); + if (ret[key].IsDefined()) { + // if key exists in lhs, merge recursivily + ret[key] = mergeConfig(lhs[key], node.second); + } else { + // just add as new key + ret[key] = Clone(node.second); + } + } + return ret; +} + +} // namespace App + +namespace YAML { + +template +struct convert { + template + static Node encode(const T &rhs) { + return rhs; + } + + template + static bool decode(const Node &node, T &rhs) { + rhs = T(node); + return true; + } +}; + +} // namespace YAML \ No newline at end of file diff --git a/src/overlaybd/stream_convertor/stream_conv.cpp b/src/overlaybd/stream_convertor/stream_conv.cpp index 8a87e759..11be358f 100644 --- a/src/overlaybd/stream_convertor/stream_conv.cpp +++ b/src/overlaybd/stream_convertor/stream_conv.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -10,62 +11,88 @@ #include #include #include +#include +#include "config.h" #include "../gzip/gz.h" #include "../tar/libtar.h" #include "../../tools/sha256file.h" +#include "photon/common/alog.h" +#include "yaml-cpp/node/node.h" +App::AppConfig gconfig; + class StreamConvertor { public: + + StreamConvertor(App::AppConfig &config) { + workdir = config.globalConfig().workDir(); + serv_addr = config.globalConfig().servAddr(); + } + std::string get_task_id() { auto now = std::chrono::system_clock::now(); - uint64_t us = std::chrono::duration_cast(now.time_since_epoch()).count(); - return std::to_string(us)+"." + std::to_string(rand() % 1000000); + uint64_t us = + std::chrono::duration_cast(now.time_since_epoch()).count(); + return std::to_string(us) + "." + std::to_string(rand() % 1000000); } void serve(photon::net::ISocketStream *sock) { auto start = std::chrono::steady_clock::now(); LOG_DEBUG("Accepted"); auto task_id = get_task_id(); - char recv[65536]; - auto streamfile = open_gzstream_file(sock, 0, true, task_id.c_str()); + auto streamfile = open_gzstream_file(sock, 0, true, task_id.c_str(), workdir.c_str()); DEFER(delete streamfile); auto turboOCI_stream = new UnTar(streamfile, nullptr, 0, 4096, nullptr, true); DEFER(delete turboOCI_stream); auto fn_tar_idx = task_id + ".tar.meta"; - auto tar_idx = lfs->open(fn_tar_idx.c_str(), O_TRUNC | O_CREAT | O_RDWR, 0644); + auto tar_idx = lfs->open(fn_tar_idx.c_str(), O_TRUNC | O_CREAT | O_RDWR, 0644); DEFER(delete tar_idx); auto nitems = turboOCI_stream->dump_tar_headers(tar_idx); - LOG_INFO("` items get in `",nitems, fn_tar_idx); - streamfile->save_index(); + if (nitems < 0) { + sock->close(); + LOG_ERROR("invalid buffer received."); + return; + } + LOG_INFO("` items get in `", nitems, fn_tar_idx); + struct stat st; + streamfile->fstat(&st); + auto fn_gz_idx = streamfile->save_index(); auto dst_tar_idx = streamfile->sha256_checksum() + ".tar.meta"; if (lfs->rename(fn_tar_idx.c_str(), dst_tar_idx.c_str()) != 0) { LOG_ERROR("rename metafile (` --> `) failed.", fn_tar_idx, dst_tar_idx); } - LOG_INFO("save tar meta success. `", dst_tar_idx); + LOG_INFO("save meta success. {gz_idx: `, tar_meta: `} ", fn_gz_idx, dst_tar_idx); auto end = std::chrono::steady_clock::now(); - auto elapsed = std::chrono::duration_cast( end - start); - LOG_INFO("task ` finish. time_elapsed: `ms", task_id, elapsed.count()); + auto elapsed = std::chrono::duration_cast(end - start); + auto decomp_speed = (double)st.st_size / 1000 / elapsed.count(); + LOG_INFO("task ` finish. {time_elapsed: `ms, decode_speed: `MB/s}", + task_id, elapsed.count(), decomp_speed); } - int start_uds_server() { - + int start() { srand(time(NULL)); - lfs = photon::fs::new_localfs_adaptor(workdir); + if (::access(workdir.c_str(), 0)) { + if (mkdir(workdir.c_str(), 0755) != 0){ + LOG_ERRNO_RETURN(0, -1, "create workdir failed."); + } + } + lfs = photon::fs::new_localfs_adaptor(workdir.c_str()); auto uds_handler = [&](photon::net::ISocketStream *s) -> int { LOG_INFO("Accept UDS"); this->serve(s); return 0; }; serv = photon::net::new_uds_server(true); - assert(0 == serv->bind(uds_path)); - assert(0 == serv->listen(100)); - char path[256]; - serv->getsockname(path, 256); - if (strcmp(path, uds_path) != 0) { - LOG_ERRNO_RETURN(0, -1, "get socket name error. ['`' != '`'(expected)]", - path, uds_path); + LOG_INFO("try to bind: `", serv_addr); + if (serv->bind(serv_addr.c_str())|| serv->listen(100)) { + LOG_ERRNO_RETURN(0, -1, "bind sock addr failed."); + } + char path[256]{}; + if ((serv->getsockname(path, 256) < 0) || (strcmp(path, serv_addr.c_str()) != 0)) { + LOG_ERRNO_RETURN(0, -1, "get socket name error. ['`' != '`'(expected)]", path, + serv_addr); } LOG_INFO("uds server listening `", path); serv->set_handler(uds_handler); @@ -82,10 +109,9 @@ class StreamConvertor { photon::fs::IFileSystem *lfs = nullptr; photon::net::ISocketServer *serv = nullptr; - const char *uds_path= "/var/run/stream_conv.sock"; - const char *workdir = "/tmp"; -} *server; - + std::string serv_addr; + std::string workdir; +} * server; static void stop_by_signal(int signal) { LOG_INFO("Got signal ", signal); @@ -93,23 +119,52 @@ static void stop_by_signal(int signal) { LOG_INFO("server stopped"); } -int main(int argc, char *argv[]){ + +int set_log_config(){ + set_log_output_level(gconfig.globalConfig().logConfig().level()); + auto config = gconfig.globalConfig().logConfig(); + if (config.mode() == "file") { + LOG_INFO("redirect log into `, limitSize: `MB, rotateNums: `", + config.path(), config.limitSizeMB(), config.rotateNum()); + auto log_fn = config.path(); + default_logger.log_output = new_log_output_file(log_fn.c_str(), + config.limitSizeMB(), + config.rotateNum() + ); + if (!default_logger.log_output) { + default_logger.log_output = log_output_stdout; + } + } + return 0; +} + +int main(int argc, char *argv[]) { mallopt(M_TRIM_THRESHOLD, 128 * 1024); // prctl(PR_SET_THP_DISABLE, 1); - set_log_output_level(1); - photon::init(photon::INIT_EVENT_DEFAULT | photon::INIT_IO_DEFAULT | - photon::INIT_EVENT_SIGNAL); - //... + // set_log_output_level(1); + photon::init(photon::INIT_EVENT_DEFAULT | photon::INIT_IO_DEFAULT | photon::INIT_EVENT_SIGNAL); + DEFER(photon::fini()); + DEFER(default_logger.log_output = log_output_null;); + photon::block_all_signal(); photon::sync_signal(SIGTERM, &stop_by_signal); photon::sync_signal(SIGINT, &stop_by_signal); photon::sync_signal(SIGTSTP, &stop_by_signal); + + auto cfg_path = argv[1]; + LOG_INFO("parsing config: `", cfg_path); + auto node = YAML::LoadFile(cfg_path); + gconfig = App::mergeConfig(gconfig, node); + if (gconfig.IsNull()) { + LOG_ERRNO_RETURN(0, -1, "parse config file failed."); + } + set_log_config(); + LOG_INFO("start server..."); // photon::sync_signal(SIGPIPE, &ignore_signal); // photon::sync_signal(SIGUSR2, &restart_by_signal); - DEFER(photon::fini()); - server = new StreamConvertor; + server = new StreamConvertor(gconfig); DEFER(delete server); - server->start_uds_server(); + server->start(); return 0; } diff --git a/src/overlaybd/tar/libtar.cpp b/src/overlaybd/tar/libtar.cpp index 7781ea7c..f933e87c 100644 --- a/src/overlaybd/tar/libtar.cpp +++ b/src/overlaybd/tar/libtar.cpp @@ -78,12 +78,17 @@ int UnTar::set_file_perms(const char *filename) { ssize_t UnTar::dump_tar_headers(photon::fs::IFile *as) { ssize_t count = 0; - while (read_header(as) == 0) { + while (true) { + auto next = read_header(as); + if (next == -1) { + return -1; + } + count++; if (TH_ISREG(header)) { auto size = get_size(); file->lseek(((size + T_BLOCKSIZE - 1) / T_BLOCKSIZE) * T_BLOCKSIZE, SEEK_CUR); // skip size } - count++; + if (next != 0) break; } return count; }