Skip to content

Commit

Permalink
Use DataBuffer in S3WriteFile (facebookincubator#6592)
Browse files Browse the repository at this point in the history
Summary:
Pass MemoryPool to ReadFile/WriteFile via FileOptions.
Use DataBuffer instead of std::vector for currentPart_ and track usage via the MemoryPool.

Pull Request resolved: facebookincubator#6592

Reviewed By: mbasmanova

Differential Revision: D49437320

Pulled By: xiaoxmeng

fbshipit-source-id: 2d6bd267e41854365d6ff6dc253e1ef8344f5569
  • Loading branch information
majetideepak authored and ericyuliu committed Oct 12, 2023
1 parent 8a716b3 commit 410784d
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 41 deletions.
4 changes: 4 additions & 0 deletions velox/common/file/FileSystems.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include "velox/common/base/Exceptions.h"
#include "velox/common/memory/MemoryPool.h"

#include <functional>
#include <memory>
Expand All @@ -31,8 +32,11 @@ namespace facebook::velox::filesystems {

/// Defines the options for per-file operations. It contains a key-value pairs
/// which can be easily extended to different storage systems.
/// MemoryPool to allocate buffers needed to read/write files on FileSystems
/// such as S3.
struct FileOptions {
std::unordered_map<std::string, std::string> values;
memory::MemoryPool* pool{nullptr};
};

/// An abstract FileSystem
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ if(VELOX_ENABLE_S3)
target_sources(velox_s3fs PRIVATE S3FileSystem.cpp S3Util.cpp)

target_include_directories(velox_s3fs PUBLIC ${AWSSDK_INCLUDE_DIRS})
target_link_libraries(velox_s3fs Folly::folly ${AWSSDK_LIBRARIES} xsimd)
target_link_libraries(velox_s3fs velox_dwio_common_exception Folly::folly
${AWSSDK_LIBRARIES} xsimd)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ s3WriteFileSinkGenerator() {
auto fileSystem =
filesystems::getFileSystem(fileURI, options.connectorProperties);
return std::make_unique<dwio::common::WriteFileSink>(
fileSystem->openFileForWrite(fileURI),
fileSystem->openFileForWrite(fileURI, {{}, options.pool}),
fileURI,
options.metricLogger,
options.stats);
Expand Down
75 changes: 39 additions & 36 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h"
#include "velox/core/Config.h"
#include "velox/dwio/common/DataBuffer.h"

#include <fmt/format.h>
#include <glog/logging.h>
Expand Down Expand Up @@ -210,10 +211,16 @@ namespace filesystems {

class S3WriteFile::Impl {
public:
explicit Impl(const std::string& path, Aws::S3::S3Client* client)
: client_(client) {
explicit Impl(
const std::string& path,
Aws::S3::S3Client* client,
memory::MemoryPool* pool)
: client_(client), pool_(pool) {
VELOX_CHECK_NOT_NULL(client);
VELOX_CHECK_NOT_NULL(pool);
getBucketAndKeyFromS3Path(path, bucket_, key_);

currentPart_ = std::make_unique<dwio::common::DataBuffer<char>>(*pool_);
currentPart_->reserve(kPartUploadSize);
// Check that the object doesn't exist, if it does throw an error.
{
Aws::S3::Model::HeadObjectRequest request;
Expand Down Expand Up @@ -255,39 +262,35 @@ class S3WriteFile::Impl {
uploadState_.id = outcome.GetResult().GetUploadId();
}

currentPart_.resize(kPartUploadSize);
closed_ = false;
fileSize_ = 0;
}

// Appends data to the end of the file.
void append(std::string_view data) {
VELOX_CHECK(!closed_, "File is closed");
if (data.size() + currentPartSize_ >= kPartUploadSize) {
VELOX_CHECK(!closed(), "File is closed");
if (data.size() + currentPart_->size() >= kPartUploadSize) {
upload(data);
} else {
// Append to current part.
memcpy(currentPartBuffer() + currentPartSize_, data.data(), data.size());
currentPartSize_ += data.size();
currentPart_->unsafeAppend(data.data(), data.size());
}
fileSize_ += data.size();
}

// No-op.
void flush() {
VELOX_CHECK(!closed_, "File is closed");
VELOX_CHECK(!closed(), "File is closed");
/// currentPartSize must be less than kPartUploadSize since
/// append() would have already flushed after reaching kUploadPartSize.
VELOX_CHECK_LE(currentPartSize_, kPartUploadSize);
VELOX_CHECK_LT(currentPart_->size(), kPartUploadSize);
}

// Complete the multipart upload and close the file.
void close() {
if (closed_) {
if (closed()) {
return;
}
uploadPart({currentPartBuffer(), currentPartSize_}, true);
currentPartSize_ = 0;
uploadPart({currentPart_->data(), currentPart_->size()}, true);
VELOX_CHECK_EQ(uploadState_.partNumber, uploadState_.completedParts.size());
// Complete the multipart upload.
{
Expand All @@ -303,7 +306,7 @@ class S3WriteFile::Impl {
VELOX_CHECK_AWS_OUTCOME(
outcome, "Failed to complete multiple part upload", bucket_, key_);
}
closed_ = true;
currentPart_->clear();
}

// Current file size, i.e. the sum of all previous appends.
Expand All @@ -320,6 +323,10 @@ class S3WriteFile::Impl {
static constexpr const char* kApplicationOctetStream =
"application/octet-stream";

bool closed() const {
return (currentPart_->capacity() == 0);
}

// Holds state for the multipart upload.
struct UploadState {
Aws::Vector<Aws::S3::Model::CompletedPart> completedParts;
Expand All @@ -329,26 +336,24 @@ class S3WriteFile::Impl {
UploadState uploadState_;

// Data can be smaller or larger than the kPartUploadSize.
// Complete the currentPart_ and chunk the remaining data.
// Stash the remaining in the current part.
// Complete the currentPart_ and upload kPartUploadSize chunks of data.
// Save the remaining into currentPart_.
void upload(const std::string_view data) {
auto dataPtr = data.data();
auto dataSize = data.size();
// Fill-up the remaining currentPart_.
auto remainingBufferSize = kPartUploadSize - currentPartSize_;
memcpy(
currentPartBuffer() + currentPartSize_, dataPtr, remainingBufferSize);
uploadPart({currentPartBuffer(), kPartUploadSize});
auto remainingBufferSize = currentPart_->capacity() - currentPart_->size();
currentPart_->unsafeAppend(dataPtr, remainingBufferSize);
uploadPart({currentPart_->data(), currentPart_->size()});
dataPtr += remainingBufferSize;
dataSize -= remainingBufferSize;
while (dataSize > kPartUploadSize) {
uploadPart({dataPtr, kPartUploadSize});
dataPtr += kPartUploadSize;
dataSize -= kPartUploadSize;
}
// stash the remaining in currentPart;
memcpy(currentPartBuffer(), dataPtr, dataSize);
currentPartSize_ = dataSize;
// Stash the remaining at the beginning of currentPart.
currentPart_->unsafeAppend(0, dataPtr, dataSize);
}

void uploadPart(const std::string_view part, bool isLast = false) {
Expand Down Expand Up @@ -377,22 +382,19 @@ class S3WriteFile::Impl {
}
}

char* currentPartBuffer() {
return currentPart_.data();
}

// TODO: Pass a MemoryPool to S3WriteFile use a MemorySink.
std::vector<char> currentPart_;
Aws::S3::S3Client* client_;
memory::MemoryPool* pool_;
std::unique_ptr<dwio::common::DataBuffer<char>> currentPart_;
std::string bucket_;
std::string key_;
size_t fileSize_ = -1;
uint32_t currentPartSize_ = 0;
bool closed_ = true;
};

S3WriteFile::S3WriteFile(const std::string& path, Aws::S3::S3Client* client) {
impl_ = std::make_shared<Impl>(path, client);
S3WriteFile::S3WriteFile(
const std::string& path,
Aws::S3::S3Client* client,
memory::MemoryPool* pool) {
impl_ = std::make_shared<Impl>(path, client, pool);
}

void S3WriteFile::append(std::string_view data) {
Expand Down Expand Up @@ -629,9 +631,10 @@ std::unique_ptr<ReadFile> S3FileSystem::openFileForRead(

std::unique_ptr<WriteFile> S3FileSystem::openFileForWrite(
std::string_view path,
const FileOptions& /*unused*/) {
const FileOptions& options) {
const auto file = s3Path(path);
auto s3file = std::make_unique<S3WriteFile>(file, impl_->s3Client());
auto s3file =
std::make_unique<S3WriteFile>(file, impl_->s3Client(), options.pool);
return s3file;
}

Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class S3FileSystem : public FileSystem {

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options) override;

void remove(std::string_view path) override {
VELOX_UNSUPPORTED("remove for S3 not implemented");
Expand Down
6 changes: 5 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include "velox/common/file/File.h"
#include "velox/common/memory/MemoryPool.h"

namespace Aws::S3 {
class S3Client;
Expand Down Expand Up @@ -46,7 +47,10 @@ namespace facebook::velox::filesystems {
/// TODO: Implement retry on failure.
class S3WriteFile : public WriteFile {
public:
S3WriteFile(const std::string& path, Aws::S3::S3Client* client);
S3WriteFile(
const std::string& path,
Aws::S3::S3Client* client,
memory::MemoryPool* pool);

/// Appends data to the end of the file.
/// Uploads a part on reaching part size limit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "velox/common/memory/Memory.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h"
#include "velox/connectors/hive/storage_adapters/s3fs/tests/S3Test.h"

Expand Down Expand Up @@ -186,7 +187,8 @@ TEST_F(S3FileSystemTest, writeFileAndRead) {

auto hiveConfig = minioServer_->hiveConfig();
filesystems::S3FileSystem s3fs(hiveConfig);
auto writeFile = s3fs.openFileForWrite(s3File);
auto pool = memory::defaultMemoryManager().addLeafPool("S3FileSystemTest");
auto writeFile = s3fs.openFileForWrite(s3File, {{}, pool.get()});
auto s3WriteFile = dynamic_cast<filesystems::S3WriteFile*>(writeFile.get());
std::string dataContent =
"Dance me to your beauty with a burning violin"
Expand Down
7 changes: 7 additions & 0 deletions velox/dwio/common/DataBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ class DataBuffer {
size_ = (offset + items);
}

void unsafeAppend(const T* src, uint64_t items) {
if (FOLLY_LIKELY(items > 0)) {
std::memcpy(data() + size_, src, sizeInBytes(items));
size_ += items;
}
}

void unsafeAppend(T value) {
buf_[size_++] = value;
}
Expand Down

0 comments on commit 410784d

Please sign in to comment.