diff --git a/include/yproxy.h b/include/yproxy.h index b245cd9..ca3c61e 100644 --- a/include/yproxy.h +++ b/include/yproxy.h @@ -4,6 +4,7 @@ #include "io_adv.h" #include "yreader.h" #include "ywriter.h" +#include "ylister.h" #include #include #include "chunkinfo.h" @@ -76,4 +77,36 @@ class YProxyWriter : public YWriter { std::string getExternalStoragePath() { return storage_path_; } XLogRecPtr getInsertionStorageLsn() { return insertion_rec_ptr_; } +}; + +// list external storage using yproxy +class YProxyLister : public YLister { +public: + explicit YProxyLister(std::shared_ptr adv, ssize_t segindx); + + virtual ~YProxyLister(); + + virtual bool close(); + + virtual std::vector list_relation_chunks(); + virtual std::vector list_chunk_names(); + +protected: + std::vector ConstructListRequest(std::string fileName); + int prepareYproxyConnection(); + + struct message + { + char type; + std::vector content; + int retCode; + }; + message readMessage(); + std::vector readObjectMetaBody(std::vector *body); + +private: + std::shared_ptr adv_; + ssize_t segindx_; + + int client_fd_{-1}; }; \ No newline at end of file diff --git a/src/storage.cpp b/src/storage.cpp index b985081..051fc38 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -20,10 +20,13 @@ #include "s3memory_mgmt.h" #include "storage_lister.h" +#include "yproxy.h" #include "url.h" #include "yezzey_meta.h" +#define USE_YPX_LISTER = 1 + int yezzey_log_level = INFO; int yezzey_ao_log_level = INFO; @@ -460,8 +463,12 @@ int statRelationSpaceUsagePerExternalChunk(Relation aorel, int segno, std::string(walg_config_path), use_gpg_crypto, yproxy_socket); /* we dont need to interact with s3 while in recovery*/ + #ifdef USE_YPX_LISTER + auto lister = YProxyLister(ioadv, GpIdentity.segindex); + #else /* ro - handler */ auto lister = StorageLister(ioadv, GpIdentity.segindex); + #endif /* stat external storage usage */ @@ -471,13 +478,15 @@ int statRelationSpaceUsagePerExternalChunk(Relation aorel, int segno, Assert((*cnt_chunks) >= 0); // do copy; - // list will be allocated in current PostgreSQL mempry context + // list will be allocated via malloc, not PostgreSQL memory context, so should + // be free in the end of function call + // this actually may lead to memory leak in multiple ways *list = (struct yezzeyChunkMeta *)palloc(sizeof(struct yezzeyChunkMeta) * (*cnt_chunks)); for (size_t i = 0; i < *cnt_chunks; ++i) { (*list)[i].chunkSize = meta[i].chunkSize; - (*list)[i].chunkName = pstrdup(meta[i].chunkName.c_str()); + (*list)[i].chunkName = strdup(meta[i].chunkName.c_str()); } /* No local storage cache logic for now */ diff --git a/src/yproxy.cpp b/src/yproxy.cpp index fe5df03..3538a8f 100644 --- a/src/yproxy.cpp +++ b/src/yproxy.cpp @@ -30,6 +30,8 @@ const char MessageTypePut = 43; const char MessageTypeCommandComplete = 44; const char MessageTypeReadyForQuery = 45; const char MessageTypeCopyData = 46; +const char MessageTypeList = 48; +const char MessageTypeObjectMeta = 49; const size_t MSG_HEADER_SIZE = 8; const size_t PROTO_HEADER_SIZE = 4; @@ -307,7 +309,7 @@ int YProxyWriter::readRFQResponce() { uint64_t msgLen = 0; for (int i = 0; i < 8; i++) { msgLen <<= 8; - msgLen += buffer[i]; + msgLen += uint8_t(buffer[i]); } if (msgLen != MSG_HEADER_SIZE + PROTO_HEADER_SIZE) { @@ -331,3 +333,176 @@ int YProxyWriter::readRFQResponce() { } return 0; } + +YProxyLister::YProxyLister(std::shared_ptr adv, ssize_t segindx) + : adv_(adv), segindx_(segindx) { } + + +YProxyLister::~YProxyLister() { close(); } + +bool YProxyLister::close() { + if (client_fd_ != -1) { + ::close(client_fd_); + client_fd_ = -1; + } + return true; +} + +int YProxyLister::prepareYproxyConnection() { + // open unix data socket + + client_fd_ = socket(AF_UNIX, SOCK_STREAM, 0); + if (client_fd_ == -1) { + // throw here? + return -1; + } + + struct sockaddr_un addr; + /* Bind socket to socket name. */ + + memset(&addr, 0, sizeof(addr)); + + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, adv_->yproxy_socket.c_str(), + sizeof(addr.sun_path) - 1); + + auto ret = + ::connect(client_fd_, (const struct sockaddr *)&addr, sizeof(addr)); + + if (ret == -1) { + return -1; + } + + return 0; +} + +std::vector YProxyLister::list_relation_chunks() { + std::vector res; + auto ret = prepareYproxyConnection(); + if (ret != 0) { + // throw? + return res; + } + + auto msg = ConstructListRequest(yezzey_block_file_path(adv_->nspname, adv_->relname, + adv_->coords_, segindx_)); + size_t rc = ::write(client_fd_, msg.data(), msg.size()); + if (rc <= 0) { + // throw? + return res; + } + + std::vector meta; + while(true) { + auto message = readMessage(); + switch (message.type) + { + case MessageTypeObjectMeta: + meta = readObjectMetaBody(&message.content); + res.insert(res.end(), meta.begin(), meta.end()); + break; + case MessageTypeReadyForQuery: + return res; + + default: + //throw? + return res; + } + } +} + +std::vector YProxyLister::list_chunk_names() { + auto chunk_meta = list_relation_chunks(); + std::vector res(chunk_meta.size()); + + for (int i = 0; i < chunk_meta.size(); i++) { + res[i] = chunk_meta[i].chunkName; + } + return res; +} + +std::vector YProxyLister::ConstructListRequest(std::string fileName) { + std::vector buff( + MSG_HEADER_SIZE + PROTO_HEADER_SIZE + fileName.size() + 1, 0); + buff[8] = MessageTypeList; + uint64_t len = buff.size(); + + strncpy(buff.data() + MSG_HEADER_SIZE + PROTO_HEADER_SIZE, fileName.c_str(), + fileName.size()); + + uint64_t cp = len; + for (ssize_t i = 7; i >= 0; --i) { + buff[i] = cp & ((1 << 8) - 1); + cp >>= 8; + } + + return buff; +} + +YProxyLister::message YProxyLister::readMessage() { + YProxyLister::message res; + int len = MSG_HEADER_SIZE; + char buffer[len]; + // try to read small number of bytes in one op + // if failed, give up + int rc = ::read(client_fd_, buffer, len); + if (rc != len) { + // handle + res.retCode = -1; + return res; + } + + uint64_t msgLen = 0; + for (int i = 0; i < 8; i++) { + msgLen <<= 8; + msgLen += uint8_t(buffer[i]); + } + + // substract header + msgLen -= len; + + char data[msgLen]; + rc = ::read(client_fd_, data, msgLen); + + if (rc != msgLen) { + // handle + res.retCode = -1; + return res; + } + + res.type = data[0]; + res.content = std::vector(data, data + msgLen); + return res; +} + +std::vector YProxyLister::readObjectMetaBody(std::vector *body) { + std::vector res; + int i = PROTO_HEADER_SIZE; + while (i < body->size()) + { + std::vector buff; + while (body->at(i) != 0 && i < body->size()) { + buff.push_back(body->at(i)); + i++; + } + i++; + std::string path(buff.begin(), buff.end()); + if (body->size() - i < 8) { + //throw? + return res; + } + int64_t size = 0; + for (int j = i; j < i + 8; j++) { + size <<= 8; + size += uint8_t(body->at(j)); + } + i += 8; + + storageChunkMeta meta; + meta.chunkName = path; + meta.chunkSize = size; + res.push_back(meta); + } + + return res; +} diff --git a/yezzey.c b/yezzey.c index 0c6462e..3b6a947 100644 --- a/yezzey.c +++ b/yezzey.c @@ -721,8 +721,6 @@ Datum yezzey_relation_describe_external_storage_structure_internal( Oid segrelid; #endif - chunkInfo = palloc(sizeof(yezzeyChunkMetaInfo)); - reloid = PG_GETARG_OID(0); segfile_array_cs = NULL; @@ -747,6 +745,8 @@ Datum yezzey_relation_describe_external_storage_structure_internal( /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); + chunkInfo = NULL; + /* * switch to memory context appropriate for multiple function calls */ @@ -792,7 +792,7 @@ Datum yezzey_relation_describe_external_storage_structure_internal( external_bytes = curr_external_bytes; local_commited_bytes = curr_local_commited_bytes; - chunkInfo = repalloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) * + chunkInfo = realloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) * (total_row + cnt_chunks)); for (size_t chunk_index = 0; chunk_index < cnt_chunks; ++chunk_index) { @@ -855,7 +855,7 @@ Datum yezzey_relation_describe_external_storage_structure_internal( external_bytes = curr_external_bytes; local_commited_bytes = curr_local_commited_bytes; - chunkInfo = repalloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) * + chunkInfo = realloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) * (total_row + cnt_chunks)); for (size_t chunk_index = 0; chunk_index < cnt_chunks; @@ -920,6 +920,7 @@ Datum yezzey_relation_describe_external_storage_structure_internal( /* fast track when no results */ MemoryContextSwitchTo(oldcontext); relation_close(aorel, AccessShareLock); + SRF_RETURN_DONE(funcctx); } @@ -932,15 +933,23 @@ Datum yezzey_relation_describe_external_storage_structure_internal( call_cntr = funcctx->call_cntr; attinmeta = funcctx->attinmeta; + chunkInfo = funcctx->user_fctx; if (call_cntr == funcctx->max_calls) { /* no pfree on segfile_array because context will be destroyed */ relation_close(aorel, AccessShareLock); + + /* cleanup */ + for (int j = 0; j < funcctx->max_calls; ++ j) { + free(chunkInfo[j].external_storage_filepath); + } + + free(chunkInfo); + /* do when there is no more left */ SRF_RETURN_DONE(funcctx); } - chunkInfo = funcctx->user_fctx; i = call_cntr; @@ -952,7 +961,7 @@ Datum yezzey_relation_describe_external_storage_structure_internal( values[0] = ObjectIdGetDatum(chunkInfo[i].reloid); values[1] = Int32GetDatum(GpIdentity.segindex); values[2] = Int32GetDatum(chunkInfo[i].segfileindex); - values[3] = CStringGetTextDatum(chunkInfo[i].external_storage_filepath); + values[3] = CStringGetTextDatum(pstrdup(chunkInfo[i].external_storage_filepath)); values[4] = Int64GetDatum(chunkInfo[i].local_bytes); values[5] = Int64GetDatum(chunkInfo[i].local_commited_bytes); values[6] = Int64GetDatum(chunkInfo[i].external_bytes);