Skip to content

Commit

Permalink
Add YProxyLister (#29)
Browse files Browse the repository at this point in the history
* Add YproxyLister class description

* Added realization of YProxyLister

* Altered StorageLister to YproxyLister

* Correct prefix in YProxyLister

* Fixed storageChunckMeta construction

* Fixed storage.cpp

* Fix name of YProxyLister class

* Fixed message length reading

* Change listing function memory storage

---------

Co-authored-by: Yury Frolov <[email protected]>
Co-authored-by: reshke <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2024
1 parent 5fd0c6f commit 1096878
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 9 deletions.
33 changes: 33 additions & 0 deletions include/yproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "io_adv.h"
#include "yreader.h"
#include "ywriter.h"
#include "ylister.h"
#include <memory>
#include <string>
#include "chunkinfo.h"
Expand Down Expand Up @@ -76,4 +77,36 @@ class YProxyWriter : public YWriter {
std::string getExternalStoragePath() { return storage_path_; }

XLogRecPtr getInsertionStorageLsn() { return insertion_rec_ptr_; }
};

// list external storage using yproxy
class YProxyLister : public YLister {
public:
explicit YProxyLister(std::shared_ptr<IOadv> adv, ssize_t segindx);

virtual ~YProxyLister();

virtual bool close();

virtual std::vector<storageChunkMeta> list_relation_chunks();
virtual std::vector<std::string> list_chunk_names();

protected:
std::vector<char> ConstructListRequest(std::string fileName);
int prepareYproxyConnection();

struct message
{
char type;
std::vector<char> content;
int retCode;
};
message readMessage();
std::vector<storageChunkMeta> readObjectMetaBody(std::vector<char> *body);

private:
std::shared_ptr<IOadv> adv_;
ssize_t segindx_;

int client_fd_{-1};
};
13 changes: 11 additions & 2 deletions src/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
#include "s3memory_mgmt.h"

#include "storage_lister.h"
#include "yproxy.h"
#include "url.h"

#include "yezzey_meta.h"

#define USE_YPX_LISTER = 1

int yezzey_log_level = INFO;
int yezzey_ao_log_level = INFO;

Expand Down Expand Up @@ -460,8 +463,12 @@ int statRelationSpaceUsagePerExternalChunk(Relation aorel, int segno,
std::string(walg_config_path), use_gpg_crypto, yproxy_socket);
/* we dont need to interact with s3 while in recovery*/

#ifdef USE_YPX_LISTER
auto lister = YProxyLister(ioadv, GpIdentity.segindex);
#else
/* ro - handler */
auto lister = StorageLister(ioadv, GpIdentity.segindex);
#endif

/* stat external storage usage */

Expand All @@ -471,13 +478,15 @@ int statRelationSpaceUsagePerExternalChunk(Relation aorel, int segno,
Assert((*cnt_chunks) >= 0);

// do copy;
// list will be allocated in current PostgreSQL mempry context
// list will be allocated via malloc, not PostgreSQL memory context, so should
// be free in the end of function call
// this actually may lead to memory leak in multiple ways
*list = (struct yezzeyChunkMeta *)palloc(sizeof(struct yezzeyChunkMeta) *
(*cnt_chunks));

for (size_t i = 0; i < *cnt_chunks; ++i) {
(*list)[i].chunkSize = meta[i].chunkSize;
(*list)[i].chunkName = pstrdup(meta[i].chunkName.c_str());
(*list)[i].chunkName = strdup(meta[i].chunkName.c_str());
}

/* No local storage cache logic for now */
Expand Down
177 changes: 176 additions & 1 deletion src/yproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const char MessageTypePut = 43;
const char MessageTypeCommandComplete = 44;
const char MessageTypeReadyForQuery = 45;
const char MessageTypeCopyData = 46;
const char MessageTypeList = 48;
const char MessageTypeObjectMeta = 49;

const size_t MSG_HEADER_SIZE = 8;
const size_t PROTO_HEADER_SIZE = 4;
Expand Down Expand Up @@ -307,7 +309,7 @@ int YProxyWriter::readRFQResponce() {
uint64_t msgLen = 0;
for (int i = 0; i < 8; i++) {
msgLen <<= 8;
msgLen += buffer[i];
msgLen += uint8_t(buffer[i]);
}

if (msgLen != MSG_HEADER_SIZE + PROTO_HEADER_SIZE) {
Expand All @@ -331,3 +333,176 @@ int YProxyWriter::readRFQResponce() {
}
return 0;
}

YProxyLister::YProxyLister(std::shared_ptr<IOadv> adv, ssize_t segindx)
: adv_(adv), segindx_(segindx) { }


YProxyLister::~YProxyLister() { close(); }

bool YProxyLister::close() {
if (client_fd_ != -1) {
::close(client_fd_);
client_fd_ = -1;
}
return true;
}

int YProxyLister::prepareYproxyConnection() {
// open unix data socket

client_fd_ = socket(AF_UNIX, SOCK_STREAM, 0);
if (client_fd_ == -1) {
// throw here?
return -1;
}

struct sockaddr_un addr;
/* Bind socket to socket name. */

memset(&addr, 0, sizeof(addr));

addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, adv_->yproxy_socket.c_str(),
sizeof(addr.sun_path) - 1);

auto ret =
::connect(client_fd_, (const struct sockaddr *)&addr, sizeof(addr));

if (ret == -1) {
return -1;
}

return 0;
}

std::vector<storageChunkMeta> YProxyLister::list_relation_chunks() {
std::vector<storageChunkMeta> res;
auto ret = prepareYproxyConnection();
if (ret != 0) {
// throw?
return res;
}

auto msg = ConstructListRequest(yezzey_block_file_path(adv_->nspname, adv_->relname,
adv_->coords_, segindx_));
size_t rc = ::write(client_fd_, msg.data(), msg.size());
if (rc <= 0) {
// throw?
return res;
}

std::vector<storageChunkMeta> meta;
while(true) {
auto message = readMessage();
switch (message.type)
{
case MessageTypeObjectMeta:
meta = readObjectMetaBody(&message.content);
res.insert(res.end(), meta.begin(), meta.end());
break;
case MessageTypeReadyForQuery:
return res;

default:
//throw?
return res;
}
}
}

std::vector<std::string> YProxyLister::list_chunk_names() {
auto chunk_meta = list_relation_chunks();
std::vector<std::string> res(chunk_meta.size());

for (int i = 0; i < chunk_meta.size(); i++) {
res[i] = chunk_meta[i].chunkName;
}
return res;
}

std::vector<char> YProxyLister::ConstructListRequest(std::string fileName) {
std::vector<char> buff(
MSG_HEADER_SIZE + PROTO_HEADER_SIZE + fileName.size() + 1, 0);
buff[8] = MessageTypeList;
uint64_t len = buff.size();

strncpy(buff.data() + MSG_HEADER_SIZE + PROTO_HEADER_SIZE, fileName.c_str(),
fileName.size());

uint64_t cp = len;
for (ssize_t i = 7; i >= 0; --i) {
buff[i] = cp & ((1 << 8) - 1);
cp >>= 8;
}

return buff;
}

YProxyLister::message YProxyLister::readMessage() {
YProxyLister::message res;
int len = MSG_HEADER_SIZE;
char buffer[len];
// try to read small number of bytes in one op
// if failed, give up
int rc = ::read(client_fd_, buffer, len);
if (rc != len) {
// handle
res.retCode = -1;
return res;
}

uint64_t msgLen = 0;
for (int i = 0; i < 8; i++) {
msgLen <<= 8;
msgLen += uint8_t(buffer[i]);
}

// substract header
msgLen -= len;

char data[msgLen];
rc = ::read(client_fd_, data, msgLen);

if (rc != msgLen) {
// handle
res.retCode = -1;
return res;
}

res.type = data[0];
res.content = std::vector<char>(data, data + msgLen);
return res;
}

std::vector<storageChunkMeta> YProxyLister::readObjectMetaBody(std::vector<char> *body) {
std::vector<storageChunkMeta> res;
int i = PROTO_HEADER_SIZE;
while (i < body->size())
{
std::vector<char> buff;
while (body->at(i) != 0 && i < body->size()) {
buff.push_back(body->at(i));
i++;
}
i++;
std::string path(buff.begin(), buff.end());
if (body->size() - i < 8) {
//throw?
return res;
}
int64_t size = 0;
for (int j = i; j < i + 8; j++) {
size <<= 8;
size += uint8_t(body->at(j));
}
i += 8;

storageChunkMeta meta;
meta.chunkName = path;
meta.chunkSize = size;
res.push_back(meta);
}

return res;
}
21 changes: 15 additions & 6 deletions yezzey.c
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,6 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
Oid segrelid;
#endif

chunkInfo = palloc(sizeof(yezzeyChunkMetaInfo));

reloid = PG_GETARG_OID(0);

segfile_array_cs = NULL;
Expand All @@ -747,6 +745,8 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();

chunkInfo = NULL;

/*
* switch to memory context appropriate for multiple function calls
*/
Expand Down Expand Up @@ -792,7 +792,7 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
external_bytes = curr_external_bytes;
local_commited_bytes = curr_local_commited_bytes;

chunkInfo = repalloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) *
chunkInfo = realloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) *
(total_row + cnt_chunks));

for (size_t chunk_index = 0; chunk_index < cnt_chunks; ++chunk_index) {
Expand Down Expand Up @@ -855,7 +855,7 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
external_bytes = curr_external_bytes;
local_commited_bytes = curr_local_commited_bytes;

chunkInfo = repalloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) *
chunkInfo = realloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) *
(total_row + cnt_chunks));

for (size_t chunk_index = 0; chunk_index < cnt_chunks;
Expand Down Expand Up @@ -920,6 +920,7 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
/* fast track when no results */
MemoryContextSwitchTo(oldcontext);
relation_close(aorel, AccessShareLock);

SRF_RETURN_DONE(funcctx);
}

Expand All @@ -932,15 +933,23 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
call_cntr = funcctx->call_cntr;

attinmeta = funcctx->attinmeta;
chunkInfo = funcctx->user_fctx;

if (call_cntr == funcctx->max_calls) {
/* no pfree on segfile_array because context will be destroyed */
relation_close(aorel, AccessShareLock);

/* cleanup */
for (int j = 0; j < funcctx->max_calls; ++ j) {
free(chunkInfo[j].external_storage_filepath);
}

free(chunkInfo);

/* do when there is no more left */
SRF_RETURN_DONE(funcctx);
}

chunkInfo = funcctx->user_fctx;

i = call_cntr;

Expand All @@ -952,7 +961,7 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
values[0] = ObjectIdGetDatum(chunkInfo[i].reloid);
values[1] = Int32GetDatum(GpIdentity.segindex);
values[2] = Int32GetDatum(chunkInfo[i].segfileindex);
values[3] = CStringGetTextDatum(chunkInfo[i].external_storage_filepath);
values[3] = CStringGetTextDatum(pstrdup(chunkInfo[i].external_storage_filepath));
values[4] = Int64GetDatum(chunkInfo[i].local_bytes);
values[5] = Int64GetDatum(chunkInfo[i].local_commited_bytes);
values[6] = Int64GetDatum(chunkInfo[i].external_bytes);
Expand Down

0 comments on commit 1096878

Please sign in to comment.