Skip to content

Commit

Permalink
uploads to s3 using multipart uploads. Testing seems to be working, b…
Browse files Browse the repository at this point in the history
…ut is incomplete.
  • Loading branch information
rw2 authored and jhiemstrawisc committed Aug 22, 2024
1 parent 6d3ca14 commit 8e0c9a4
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 8 deletions.
72 changes: 72 additions & 0 deletions src/S3Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,52 @@ bool AmazonS3Upload::SendRequest(const std::string &payload, off_t offset,

// ---------------------------------------------------------------------------

AmazonS3CompleteMultipartUpload::~AmazonS3CompleteMultipartUpload() {}

bool AmazonS3CompleteMultipartUpload::SendRequest(
const std::vector<std::string> &eTags, int partNumber,
const std::string &uploadId) {
query_parameters["uploadId"] = uploadId;

httpVerb = "POST";
std::string payload;
payload += "<CompleteMultipartUpload "
"xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">";
for (int i = 1; i < partNumber; i++) {
payload += "<Part>";
payload += "<ETag>" + eTags[i - 1] + "</ETag>";
payload += "<PartNumber>" + std::to_string(i) + "</PartNumber>";
payload += "</Part>";
}
payload += "</CompleteMultipartUpload>";

return SendS3Request(payload);
}
// ---------------------------------------------------------------------------

AmazonS3CreateMultipartUpload::~AmazonS3CreateMultipartUpload() {}
AmazonS3SendMultipartPart::~AmazonS3SendMultipartPart() {}

bool AmazonS3CreateMultipartUpload::SendRequest() {
query_parameters["uploads"] = "";
query_parameters["x-id"] = "CreateMultipartUpload";

httpVerb = "POST";
return SendS3Request("");
}

bool AmazonS3SendMultipartPart::SendRequest(const std::string &payload,
const std::string &partNumber,
const std::string &uploadId) {
query_parameters["partNumber"] = partNumber;
query_parameters["uploadId"] = uploadId;
includeResponseHeader = true;
httpVerb = "PUT";
return SendS3Request(payload);
}

// ---------------------------------------------------------------------------

AmazonS3Download::~AmazonS3Download() {}

bool AmazonS3Download::SendRequest(off_t offset, size_t size) {
Expand Down Expand Up @@ -505,6 +551,32 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) {
return SendS3Request("");
}

bool AmazonS3CreateMultipartUpload::Results(std::string &uploadId,
std::string &errMsg) {
tinyxml2::XMLDocument doc;
auto err = doc.Parse(resultString.c_str());
if (err != tinyxml2::XML_SUCCESS) {
errMsg = doc.ErrorStr();
return false;
}

auto elem = doc.RootElement();
if (strcmp(elem->Name(), "InitiateMultipartUploadResult")) {
errMsg = "S3 Uploads response is not rooted with "
"InitiateMultipartUploadResult "
"element";
return false;
}

for (auto child = elem->FirstChildElement(); child != nullptr;
child = child->NextSiblingElement()) {
if (!strcmp(child->Name(), "UploadId")) {
uploadId = child->GetText();
}
}
return true;
}

// Parse the results of the AWS directory listing
//
// S3 returns an XML structure for directory listings so we must pick it apart
Expand Down
74 changes: 74 additions & 0 deletions src/S3Commands.hh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,80 @@ class AmazonS3Upload : public AmazonRequest {
std::string path;
};

class AmazonS3CreateMultipartUpload : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3CreateMultipartUpload(const S3AccessInfo &ai,
const std::string &objectName,
XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3CreateMultipartUpload(const std::string &s, const std::string &akf,
const std::string &skf, const std::string &b,
const std::string &o,
const std::string &style, XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

bool Results(std::string &uploadId, std::string &errMsg);

virtual ~AmazonS3CreateMultipartUpload();

virtual bool SendRequest();

protected:
// std::string path;
};

class AmazonS3CompleteMultipartUpload : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3CompleteMultipartUpload(const S3AccessInfo &ai,
const std::string &objectName,
XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3CompleteMultipartUpload(const std::string &s,
const std::string &akf,
const std::string &skf,
const std::string &b, const std::string &o,
const std::string &style, XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

virtual ~AmazonS3CompleteMultipartUpload();

virtual bool SendRequest(const std::vector<std::string> &eTags,
int partNumber, const std::string &uploadId);

protected:
};

class AmazonS3SendMultipartPart : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3SendMultipartPart(const S3AccessInfo &ai,
const std::string &objectName, XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3SendMultipartPart(const std::string &s, const std::string &akf,
const std::string &skf, const std::string &b,
const std::string &o, const std::string &style,
XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

bool Results(std::string &uploadId, std::string &errMsg);

virtual ~AmazonS3SendMultipartPart();

virtual bool SendRequest(const std::string &payload,
const std::string &partNumber,
const std::string &uploadId);

protected:
};

class AmazonS3Download : public AmazonRequest {
using AmazonRequest::SendRequest;

Expand Down
92 changes: 84 additions & 8 deletions src/S3File.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <memory>
#include <mutex>
#include <sstream>
#include <stdlib.h>
#include <string>
#include <vector>

Expand All @@ -46,9 +47,17 @@ S3FileSystem *g_s3_oss = nullptr;
XrdVERSIONINFO(XrdOssGetFileSystem, S3);

S3File::S3File(XrdSysError &log, S3FileSystem *oss)
: m_log(log), m_oss(oss), content_length(0), last_modified(0) {}
: m_log(log), m_oss(oss), content_length(0), last_modified(0),
write_buffer(""), partNumber(1) {}

int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
if (Oflag && O_CREAT) {
m_log.Log(LogMask::Info, "File opened for creation: ", path);
}
if (Oflag && O_APPEND) {
m_log.Log(LogMask::Info, "File opened for append: ", path);
}

if (m_log.getMsgMask() & XrdHTTPServer::Debug) {
m_log.Log(LogMask::Warning, "S3File::Open", "Opening file", path);
}
Expand Down Expand Up @@ -79,6 +88,14 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
}
}

AmazonS3CreateMultipartUpload startUpload(m_ai, m_object, m_log);
if (!startUpload.SendRequest()) {
m_log.Emsg("Open", "S3 multipart request failed");
return -ENOENT;
}
std::string errMsg;
startUpload.Results(uploadId, errMsg);

return 0;
}

Expand Down Expand Up @@ -177,21 +194,80 @@ int S3File::Fstat(struct stat *buff) {
}

ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) {
AmazonS3Upload upload(m_ai, m_object, m_log);

std::string payload((char *)buffer, size);
if (!upload.SendRequest(payload, offset, size)) {
m_log.Emsg("Open", "upload.SendRequest() failed");
size_t payload_size = payload.length();
if (payload_size != size) {
return -ENOENT;
}
write_buffer += payload;

// XXX should this be configurable? 100mb gives us a TB of file size. It
// doesn't seem terribly useful to be much smaller and it's not clear the S3
// API will work if it's much larger.
if (write_buffer.length() > 100000000) {
if (SendPart() == -ENOENT) {
return -ENOENT;
}
}
return size;
}

int S3File::SendPart() {
int length = write_buffer.length();
AmazonS3SendMultipartPart upload_part_request =
AmazonS3SendMultipartPart(m_ai, m_object, m_log);
if (!upload_part_request.SendRequest(
write_buffer, std::to_string(partNumber), uploadId)) {
m_log.Emsg("SendPart", "upload.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("Open", "upload.SendRequest() succeeded");
return 0;
m_log.Emsg("SendPart", "upload.SendRequest() succeeded");
std::string resultString = upload_part_request.getResultString();
std::size_t startPos = resultString.find("ETag:");
std::size_t endPos = resultString.find("\"", startPos + 7);
eTags.push_back(
resultString.substr(startPos + 7, endPos - startPos - 7));

partNumber++;
write_buffer = "";
}

return length;
}

int S3File::Close(long long *retsz) {
m_log.Emsg("Close", "Closed our S3 file");
// this is only true if a buffer exists that needs to be drained
if (write_buffer.length() > 0) {
if (SendPart() == -ENOENT) {
return -ENOENT;
} else {
m_log.Emsg("Close", "Closed our S3 file");
}
}
// this is only true if some parts have been written and need to be
// finalized
if (partNumber > 1) {
AmazonS3CompleteMultipartUpload complete_upload_request =
AmazonS3CompleteMultipartUpload(m_ai, m_object, m_log);
if (!complete_upload_request.SendRequest(eTags, partNumber, uploadId)) {
m_log.Emsg("SendPart", "close.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("SendPart", "close.SendRequest() succeeded");
}
}

return 0;

/* Original write code
std::string payload((char *)buffer, size);
if (!upload.SendRequest(payload, offset, size)) {
m_log.Emsg("Open", "upload.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("Open", "upload.SendRequest() succeeded");
return 0;
} */
}

extern "C" {
Expand Down
6 changes: 6 additions & 0 deletions src/S3File.hh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class S3File : public XrdOssDF {
time_t getLastModified() { return last_modified; }

private:
int SendPart();
XrdSysError &m_log;
S3FileSystem *m_oss;

Expand All @@ -103,4 +104,9 @@ class S3File : public XrdOssDF {

size_t content_length;
time_t last_modified;

std::string write_buffer;
std::string uploadId;
int partNumber;
std::vector<std::string> eTags;
};

0 comments on commit 8e0c9a4

Please sign in to comment.