Skip to content

Commit

Permalink
GH-37670: [C++] IO FileInterface extend from enable_shared_from_this (#…
Browse files Browse the repository at this point in the history
…37713)

### Rationale for this change

S3 `FlushAsync` might has lifetime problem, this patch fixes that.

### What changes are included in this PR?

1. Move `enable_shared_from_this` to `FileInterface`
2. Update S3 `FlushAsync`
3. Implement sync Flush to avoid call `share_from_this` in dtor.

### Are these changes tested?

no

### Are there any user-facing changes?

no

* Closes: #37670

Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
3 people authored Sep 19, 2023
1 parent 76c4a6e commit 0e6a683
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 42 deletions.
85 changes: 49 additions & 36 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1454,14 +1454,7 @@ class ObjectOutputStream final : public io::OutputStream {

// OutputStream interface

Status Close() override {
auto fut = CloseAsync();
return fut.status();
}

Future<> CloseAsync() override {
if (closed_) return Status::OK();

Status EnsureReadyToFlushFromClose() {
if (current_part_) {
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
Expand All @@ -1472,36 +1465,56 @@ class ObjectOutputStream final : public io::OutputStream {
RETURN_NOT_OK(UploadPart("", 0));
}

// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([this]() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
return Status::OK();
}

// At this point, all part uploads have finished successfully
DCHECK_GT(part_number_, 1);
DCHECK_EQ(upload_state_->completed_parts.size(),
static_cast<size_t>(part_number_ - 1));

S3Model::CompletedMultipartUpload completed_upload;
completed_upload.SetParts(upload_state_->completed_parts);
S3Model::CompleteMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
req.SetMultipartUpload(std::move(completed_upload));

auto outcome =
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When completing multiple part upload for key '",
path_.key, "' in bucket '", path_.bucket, "': "),
"CompleteMultipartUpload", outcome.GetError());
}
Status FinishPartUploadAfterFlush() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

holder_ = nullptr;
closed_ = true;
return Status::OK();
});
// At this point, all part uploads have finished successfully
DCHECK_GT(part_number_, 1);
DCHECK_EQ(upload_state_->completed_parts.size(),
static_cast<size_t>(part_number_ - 1));

S3Model::CompletedMultipartUpload completed_upload;
completed_upload.SetParts(upload_state_->completed_parts);
S3Model::CompleteMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
req.SetMultipartUpload(std::move(completed_upload));

auto outcome =
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When completing multiple part upload for key '",
path_.key, "' in bucket '", path_.bucket, "': "),
"CompleteMultipartUpload", outcome.GetError());
}

holder_ = nullptr;
closed_ = true;
return Status::OK();
}

Status Close() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(EnsureReadyToFlushFromClose());

RETURN_NOT_OK(Flush());
return FinishPartUploadAfterFlush();
}

Future<> CloseAsync() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(EnsureReadyToFlushFromClose());

auto self = std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([self]() { return self->FinishPartUploadAfterFlush(); });
}

bool closed() const override { return closed_; }
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,21 @@ class TestS3FS : public S3TestMixin {
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}

void TestOpenOutputStreamCloseAsyncDestructor() {
std::shared_ptr<io::OutputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile"));
ASSERT_OK(stream->Write("new data"));
// Destructor implicitly closes stream and completes the multipart upload.
// GH-37670: Testing it doesn't matter whether flush is triggered asynchronously
// after CloseAsync or synchronously after stream.reset() since we're just
// checking that `closeAsyncFut` keeps the stream alive until completion
// rather than segfaulting on a dangling stream
auto closeAsyncFut = stream->CloseAsync();
stream.reset();
ASSERT_OK(closeAsyncFut.MoveResult());
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}

protected:
S3Options options_;
std::shared_ptr<S3FileSystem> fs_;
Expand Down Expand Up @@ -1177,6 +1192,16 @@ TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) {
TestOpenOutputStreamDestructor();
}

TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorBackgroundWrites) {
TestOpenOutputStreamCloseAsyncDestructor();
}

TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) {
options_.background_writes = false;
MakeFileSystem();
TestOpenOutputStreamCloseAsyncDestructor();
}

TEST_F(TestS3FS, OpenOutputStreamMetadata) {
std::shared_ptr<io::OutputStream> stream;

Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ Result<std::shared_ptr<const KeyValueMetadata>> InputStream::ReadMetadata() {
// executor
Future<std::shared_ptr<const KeyValueMetadata>> InputStream::ReadMetadataAsync(
const IOContext& ctx) {
auto self = shared_from_this();
std::shared_ptr<InputStream> self =
std::dynamic_pointer_cast<InputStream>(shared_from_this());
return DeferNotOk(internal::SubmitIO(ctx, [self] { return self->ReadMetadata(); }));
}

Expand Down Expand Up @@ -165,7 +166,7 @@ Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position,
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext& ctx,
int64_t position,
int64_t nbytes) {
auto self = checked_pointer_cast<RandomAccessFile>(shared_from_this());
auto self = std::dynamic_pointer_cast<RandomAccessFile>(shared_from_this());
return DeferNotOk(internal::SubmitIO(
ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); }));
}
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ struct ARROW_EXPORT IOContext {
StopToken stop_token_;
};

class ARROW_EXPORT FileInterface {
class ARROW_EXPORT FileInterface : public std::enable_shared_from_this<FileInterface> {
public:
virtual ~FileInterface() = 0;

Expand Down Expand Up @@ -205,9 +205,7 @@ class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable
OutputStream() = default;
};

class ARROW_EXPORT InputStream : virtual public FileInterface,
virtual public Readable,
public std::enable_shared_from_this<InputStream> {
class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public Readable {
public:
/// \brief Advance or skip stream indicated number of bytes
/// \param[in] nbytes the number to move forward
Expand Down

0 comments on commit 0e6a683

Please sign in to comment.