From 89899b4575e02c8cecb879fdc7e4e278653f506f Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 8 Nov 2023 23:59:15 +0000 Subject: [PATCH 01/26] Paste in object append stream from #12914 --- cpp/src/arrow/filesystem/azurefs.cc | 190 ++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index bd0e353e4a03a..0cfe30c58fd7d 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -461,6 +461,196 @@ class ObjectInputFile final : public io::RandomAccessFile { int64_t content_length_ = kNoSize; std::shared_ptr metadata_; }; + +class ObjectAppendStream final : public io::OutputStream { + public: + ObjectAppendStream( + std::shared_ptr& path_client, + std::shared_ptr& file_client, + std::shared_ptr& blob_client, + const bool is_hierarchical_namespace_enabled, const io::IOContext& io_context, + const AzurePath& path, const std::shared_ptr& metadata) + : path_client_(std::move(path_client)), + file_client_(std::move(file_client)), + blob_client_(std::move(blob_client)), + is_hierarchical_namespace_enabled_(is_hierarchical_namespace_enabled), + io_context_(io_context), + path_(path) {} + + ~ObjectAppendStream() override { + // For compliance with the rest of the IO stack, Close rather than Abort, + // even though it may be more expensive. + io::internal::CloseFromDestructor(this); + } + + Status Init() { + closed_ = false; + if (content_length_ != kNoSize) { + DCHECK_GE(content_length_, 0); + return Status::OK(); + } + try { + auto properties = path_client_->GetProperties(); + if (properties.Value.IsDirectory) { + return ::arrow::fs::internal::NotAFile(path_.full_path); + } + content_length_ = properties.Value.FileSize; + pos_ = content_length_; + } catch (const Azure::Storage::StorageException& exception) { + // new file + if (is_hierarchical_namespace_enabled_) { + try { + file_client_->CreateIfNotExists(); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else { + std::string s = ""; + try { + file_client_->UploadFrom( + const_cast(reinterpret_cast(s.data())), s.size()); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + content_length_ = 0; + } + return Status::OK(); + } + + Status Abort() override { + if (closed_) { + return Status::OK(); + } + path_client_ = nullptr; + file_client_ = nullptr; + blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + // OutputStream interface + + Status Close() override { + if (closed_) { + return Status::OK(); + } + path_client_ = nullptr; + file_client_ = nullptr; + blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Result Tell() const override { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + return pos_; + } + + Status Write(const std::shared_ptr& buffer) override { + return DoAppend(buffer->data(), buffer->size(), buffer); + } + + Status Write(const void* data, int64_t nbytes) override { + return DoAppend(data, nbytes); + } + + Status DoAppend(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + if (is_hierarchical_namespace_enabled_) { + try { + auto buffer_stream = std::make_unique( + Azure::Core::IO::MemoryBodyStream( + const_cast(reinterpret_cast(data)), nbytes)); + if (buffer_stream->Length() == 0) { + return Status::OK(); + } + auto result = file_client_->Append(*buffer_stream, pos_); + pos_ += nbytes; + file_client_->Flush(pos_); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else { + try { + auto append_data = static_cast((void*)data); + auto res = blob_client_->GetBlockList().Value; + auto size = res.CommittedBlocks.size(); + std::string block_id; + { + block_id = std::to_string(size + 1); + size_t n = 8; + int precision = n - std::min(n, block_id.size()); + block_id.insert(0, precision, '0'); + } + block_id = Azure::Core::Convert::Base64Encode( + std::vector(block_id.begin(), block_id.end())); + auto block_content = Azure::Core::IO::MemoryBodyStream( + append_data, strlen(reinterpret_cast(append_data))); + if (block_content.Length() == 0) { + return Status::OK(); + } + blob_client_->StageBlock(block_id, block_content); + std::vector block_ids; + for (auto block : res.CommittedBlocks) { + block_ids.push_back(block.Name); + } + block_ids.push_back(block_id); + blob_client_->CommitBlockList(block_ids); + pos_ += nbytes; + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + content_length_ += nbytes; + return Status::OK(); + } + + Status Flush() override { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + if (is_hierarchical_namespace_enabled_) { + try { + file_client_->Flush(content_length_); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else { + try { + auto res = blob_client_->GetBlockList().Value; + std::vector block_ids; + for (auto block : res.UncommittedBlocks) { + block_ids.push_back(block.Name); + } + blob_client_->CommitBlockList(block_ids); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + return Status::OK(); + } + + protected: + std::shared_ptr path_client_; + std::shared_ptr file_client_; + std::shared_ptr blob_client_; + const bool is_hierarchical_namespace_enabled_; + const io::IOContext io_context_; + const AzurePath path_; + + bool closed_ = true; + int64_t pos_ = 0; + int64_t content_length_ = kNoSize; +}; + } // namespace // ----------------------------------------------------------------------- From 12c1978797379163e040746afeeb057f784386b7 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 9 Nov 2023 01:41:06 +0000 Subject: [PATCH 02/26] Delete hierarchical namespace code paths --- cpp/src/arrow/filesystem/azurefs.cc | 146 ++++++++++------------------ 1 file changed, 50 insertions(+), 96 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 0cfe30c58fd7d..dfd00e5a91a99 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -24,6 +24,7 @@ #include "arrow/buffer.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/util_internal.h" +#include "arrow/io/interfaces.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" #include "arrow/util/formatting.h" @@ -464,18 +465,11 @@ class ObjectInputFile final : public io::RandomAccessFile { class ObjectAppendStream final : public io::OutputStream { public: - ObjectAppendStream( - std::shared_ptr& path_client, - std::shared_ptr& file_client, - std::shared_ptr& blob_client, - const bool is_hierarchical_namespace_enabled, const io::IOContext& io_context, - const AzurePath& path, const std::shared_ptr& metadata) - : path_client_(std::move(path_client)), - file_client_(std::move(file_client)), - blob_client_(std::move(blob_client)), - is_hierarchical_namespace_enabled_(is_hierarchical_namespace_enabled), - io_context_(io_context), - path_(path) {} + ObjectAppendStream(std::shared_ptr blob_client, + const bool is_hierarchical_namespace_enabled, + const io::IOContext& io_context, const AzurePath& path, + const std::shared_ptr& metadata) + : blob_client_(std::move(blob_client)), io_context_(io_context), path_(path) {} ~ObjectAppendStream() override { // For compliance with the rest of the IO stack, Close rather than Abort, @@ -490,28 +484,19 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } try { - auto properties = path_client_->GetProperties(); - if (properties.Value.IsDirectory) { - return ::arrow::fs::internal::NotAFile(path_.full_path); - } - content_length_ = properties.Value.FileSize; + auto properties = blob_client_->GetProperties(); + // TODO: Consider adding a check for whether its a directory. + content_length_ = properties.Value.BlobSize; pos_ = content_length_; } catch (const Azure::Storage::StorageException& exception) { // new file - if (is_hierarchical_namespace_enabled_) { - try { - file_client_->CreateIfNotExists(); - } catch (const Azure::Storage::StorageException& exception) { - return Status::IOError(exception.RawResponse->GetReasonPhrase()); - } - } else { - std::string s = ""; - try { - file_client_->UploadFrom( - const_cast(reinterpret_cast(s.data())), s.size()); - } catch (const Azure::Storage::StorageException& exception) { - return Status::IOError(exception.RawResponse->GetReasonPhrase()); - } + + std::string s = ""; + try { + blob_client_->UploadFrom( + const_cast(reinterpret_cast(s.data())), s.size()); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); } content_length_ = 0; } @@ -522,8 +507,6 @@ class ObjectAppendStream final : public io::OutputStream { if (closed_) { return Status::OK(); } - path_client_ = nullptr; - file_client_ = nullptr; blob_client_ = nullptr; closed_ = true; return Status::OK(); @@ -535,8 +518,6 @@ class ObjectAppendStream final : public io::OutputStream { if (closed_) { return Status::OK(); } - path_client_ = nullptr; - file_client_ = nullptr; blob_client_ = nullptr; closed_ = true; return Status::OK(); @@ -564,50 +545,34 @@ class ObjectAppendStream final : public io::OutputStream { if (closed_) { return Status::Invalid("Operation on closed stream"); } - if (is_hierarchical_namespace_enabled_) { - try { - auto buffer_stream = std::make_unique( - Azure::Core::IO::MemoryBodyStream( - const_cast(reinterpret_cast(data)), nbytes)); - if (buffer_stream->Length() == 0) { - return Status::OK(); - } - auto result = file_client_->Append(*buffer_stream, pos_); - pos_ += nbytes; - file_client_->Flush(pos_); - } catch (const Azure::Storage::StorageException& exception) { - return Status::IOError(exception.RawResponse->GetReasonPhrase()); + try { + auto append_data = static_cast((void*)data); + auto res = blob_client_->GetBlockList().Value; + auto size = res.CommittedBlocks.size(); + std::string block_id; + { + block_id = std::to_string(size + 1); + size_t n = 8; + int precision = n - std::min(n, block_id.size()); + block_id.insert(0, precision, '0'); } - } else { - try { - auto append_data = static_cast((void*)data); - auto res = blob_client_->GetBlockList().Value; - auto size = res.CommittedBlocks.size(); - std::string block_id; - { - block_id = std::to_string(size + 1); - size_t n = 8; - int precision = n - std::min(n, block_id.size()); - block_id.insert(0, precision, '0'); - } - block_id = Azure::Core::Convert::Base64Encode( - std::vector(block_id.begin(), block_id.end())); - auto block_content = Azure::Core::IO::MemoryBodyStream( - append_data, strlen(reinterpret_cast(append_data))); - if (block_content.Length() == 0) { - return Status::OK(); - } - blob_client_->StageBlock(block_id, block_content); - std::vector block_ids; - for (auto block : res.CommittedBlocks) { - block_ids.push_back(block.Name); - } - block_ids.push_back(block_id); - blob_client_->CommitBlockList(block_ids); - pos_ += nbytes; - } catch (const Azure::Storage::StorageException& exception) { - return Status::IOError(exception.RawResponse->GetReasonPhrase()); + block_id = Azure::Core::Convert::Base64Encode( + std::vector(block_id.begin(), block_id.end())); + auto block_content = Azure::Core::IO::MemoryBodyStream( + append_data, strlen(reinterpret_cast(append_data))); + if (block_content.Length() == 0) { + return Status::OK(); } + blob_client_->StageBlock(block_id, block_content); + std::vector block_ids; + for (auto block : res.CommittedBlocks) { + block_ids.push_back(block.Name); + } + block_ids.push_back(block_id); + blob_client_->CommitBlockList(block_ids); + pos_ += nbytes; + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); } content_length_ += nbytes; return Status::OK(); @@ -617,32 +582,21 @@ class ObjectAppendStream final : public io::OutputStream { if (closed_) { return Status::Invalid("Operation on closed stream"); } - if (is_hierarchical_namespace_enabled_) { - try { - file_client_->Flush(content_length_); - } catch (const Azure::Storage::StorageException& exception) { - return Status::IOError(exception.RawResponse->GetReasonPhrase()); - } - } else { - try { - auto res = blob_client_->GetBlockList().Value; - std::vector block_ids; - for (auto block : res.UncommittedBlocks) { - block_ids.push_back(block.Name); - } - blob_client_->CommitBlockList(block_ids); - } catch (const Azure::Storage::StorageException& exception) { - return Status::IOError(exception.RawResponse->GetReasonPhrase()); + try { + auto res = blob_client_->GetBlockList().Value; + std::vector block_ids; + for (auto block : res.UncommittedBlocks) { + block_ids.push_back(block.Name); } + blob_client_->CommitBlockList(block_ids); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); } return Status::OK(); } protected: - std::shared_ptr path_client_; - std::shared_ptr file_client_; std::shared_ptr blob_client_; - const bool is_hierarchical_namespace_enabled_; const io::IOContext io_context_; const AzurePath path_; From bd787cc9987eb2d9c82e763b66084950425cbc16 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 15 Nov 2023 08:40:04 +0000 Subject: [PATCH 03/26] Paste in tests from gcsfs_test.cc --- cpp/src/arrow/filesystem/azurefs_test.cc | 124 +++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index ecf0a19f684eb..4762a9bdca9de 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -647,6 +647,130 @@ TEST_F(AzuriteFileSystemTest, OpenInputStreamClosed) { ASSERT_RAISES(Invalid, stream->Tell()); } + +TEST_F(GcsIntegrationTest, TestWriteWithDefaults) { + auto options = TestGcsOptions(); + options.default_bucket_location = "utopia"; + options.default_metadata = arrow::key_value_metadata({{"foo", "bar"}}); + auto fs = GcsFileSystem::Make(options); + std::string bucket = "new_bucket_with_default_location"; + auto file_name = "object_with_defaults"; + ASSERT_OK(fs->CreateDir(bucket, /*recursive=*/false)); + const auto path = bucket + "/" + file_name; + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, /*metadata=*/{})); + const auto expected = std::string(kLoremIpsum); + ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + std::shared_ptr input; + ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + + std::array inbuf{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + + EXPECT_EQ(std::string(inbuf.data(), size), expected); + auto object = GcsClient().GetObjectMetadata(bucket, file_name); + ASSERT_TRUE(object.ok()) << "status=" << object.status(); + EXPECT_EQ(object->mutable_metadata()["foo"], "bar"); + auto bucket_info = GcsClient().GetBucketMetadata(bucket); + ASSERT_TRUE(bucket_info.ok()) << "status=" << object.status(); + EXPECT_EQ(bucket_info->location(), "utopia"); + + // Check that explicit metadata overrides the defaults. + ASSERT_OK_AND_ASSIGN( + output, fs->OpenOutputStream( + path, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}}))); + ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Close()); + object = GcsClient().GetObjectMetadata(bucket, file_name); + ASSERT_TRUE(object.ok()) << "status=" << object.status(); + EXPECT_EQ(object->mutable_metadata()["bar"], "foo"); + // Defaults are overwritten and not merged. + EXPECT_FALSE(object->has_metadata("foo")); +} + +TEST_F(GcsIntegrationTest, OpenOutputStreamSmall) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + const auto path = PreexistingBucketPath() + "test-write-object"; + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + const auto expected = std::string(kLoremIpsum); + ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + std::shared_ptr input; + ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + + std::array inbuf{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + + EXPECT_EQ(std::string(inbuf.data(), size), expected); +} + +TEST_F(GcsIntegrationTest, OpenOutputStreamLarge) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + const auto path = PreexistingBucketPath() + "test-write-object"; + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + // These buffer sizes are intentionally not multiples of the upload quantum (256 KiB). + std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; + std::array buffers{ + std::string(sizes[0], 'A'), + std::string(sizes[1], 'B'), + std::string(sizes[2], 'C'), + }; + auto expected = std::int64_t{0}; + for (auto i = 0; i != 3; ++i) { + ASSERT_OK(output->Write(buffers[i].data(), buffers[i].size())); + expected += sizes[i]; + ASSERT_EQ(output->Tell(), expected); + } + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + std::shared_ptr input; + ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + + std::string contents; + std::shared_ptr buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); + ASSERT_TRUE(buffer); + contents.append(buffer->ToString()); + } while (buffer->size() != 0); + + EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]); +} + +TEST_F(GcsIntegrationTest, OpenOutputStreamClosed) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + const auto path = internal::ConcatAbstractPath(PreexistingBucketName(), + "open-output-stream-closed.txt"); + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + ASSERT_OK(output->Close()); + ASSERT_RAISES(Invalid, output->Write(kLoremIpsum, std::strlen(kLoremIpsum))); + ASSERT_RAISES(Invalid, output->Flush()); + ASSERT_RAISES(Invalid, output->Tell()); +} + +TEST_F(GcsIntegrationTest, OpenOutputStreamUri) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + + const auto path = + internal::ConcatAbstractPath(PreexistingBucketName(), "open-output-stream-uri.txt"); + ASSERT_RAISES(Invalid, fs->OpenInputStream("gs://" + path)); +} + + TEST_F(AzuriteFileSystemTest, OpenInputFileMixedReadVsReadAt) { // Create a file large enough to make the random access tests non-trivial. auto constexpr kLineWidth = 100; From cb9879b82b6f8be6c8dd37aed94ea82197d7dcdf Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 15 Nov 2023 08:48:14 +0000 Subject: [PATCH 04/26] Mostly correct tests --- cpp/src/arrow/filesystem/azurefs_test.cc | 128 +++++++++++------------ 1 file changed, 59 insertions(+), 69 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 4762a9bdca9de..e8d1db1129051 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -647,78 +647,73 @@ TEST_F(AzuriteFileSystemTest, OpenInputStreamClosed) { ASSERT_RAISES(Invalid, stream->Tell()); } - -TEST_F(GcsIntegrationTest, TestWriteWithDefaults) { - auto options = TestGcsOptions(); - options.default_bucket_location = "utopia"; - options.default_metadata = arrow::key_value_metadata({{"foo", "bar"}}); - auto fs = GcsFileSystem::Make(options); - std::string bucket = "new_bucket_with_default_location"; - auto file_name = "object_with_defaults"; - ASSERT_OK(fs->CreateDir(bucket, /*recursive=*/false)); - const auto path = bucket + "/" + file_name; +// TEST_F(AzuriteFileSystemTest, TestWriteWithDefaults) { +// auto options = TestGcsOptions(); +// options.default_bucket_location = "utopia"; +// options.default_metadata = arrow::key_value_metadata({{"foo", "bar"}}); +// auto fs = GcsFileSystem::Make(options); +// std::string bucket = "new_bucket_with_default_location"; +// auto file_name = "object_with_defaults"; +// ASSERT_OK(fs->CreateDir(bucket, /*recursive=*/false)); +// const auto path = bucket + "/" + file_name; +// std::shared_ptr output; +// ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, /*metadata=*/{})); +// const auto expected = std::string(kLoremIpsum); +// ASSERT_OK(output->Write(expected.data(), expected.size())); +// ASSERT_OK(output->Close()); + +// // Verify we can read the object back. +// std::shared_ptr input; +// ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + +// std::array inbuf{}; +// std::int64_t size; +// ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + +// EXPECT_EQ(std::string(inbuf.data(), size), expected); +// auto object = GcsClient().GetObjectMetadata(bucket, file_name); +// ASSERT_TRUE(object.ok()) << "status=" << object.status(); +// EXPECT_EQ(object->mutable_metadata()["foo"], "bar"); +// auto bucket_info = GcsClient().GetBucketMetadata(bucket); +// ASSERT_TRUE(bucket_info.ok()) << "status=" << object.status(); +// EXPECT_EQ(bucket_info->location(), "utopia"); + +// // Check that explicit metadata overrides the defaults. +// ASSERT_OK_AND_ASSIGN( +// output, fs->OpenOutputStream( +// path, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}}))); +// ASSERT_OK(output->Write(expected.data(), expected.size())); +// ASSERT_OK(output->Close()); +// object = GcsClient().GetObjectMetadata(bucket, file_name); +// ASSERT_TRUE(object.ok()) << "status=" << object.status(); +// EXPECT_EQ(object->mutable_metadata()["bar"], "foo"); +// // Defaults are overwritten and not merged. +// EXPECT_FALSE(object->has_metadata("foo")); +// } + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamSmall) { + const auto path = PreexistingContainerPath() + "test-write-object"; std::shared_ptr output; - ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, /*metadata=*/{})); + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); const auto expected = std::string(kLoremIpsum); ASSERT_OK(output->Write(expected.data(), expected.size())); ASSERT_OK(output->Close()); // Verify we can read the object back. std::shared_ptr input; - ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); std::array inbuf{}; std::int64_t size; ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); EXPECT_EQ(std::string(inbuf.data(), size), expected); - auto object = GcsClient().GetObjectMetadata(bucket, file_name); - ASSERT_TRUE(object.ok()) << "status=" << object.status(); - EXPECT_EQ(object->mutable_metadata()["foo"], "bar"); - auto bucket_info = GcsClient().GetBucketMetadata(bucket); - ASSERT_TRUE(bucket_info.ok()) << "status=" << object.status(); - EXPECT_EQ(bucket_info->location(), "utopia"); - - // Check that explicit metadata overrides the defaults. - ASSERT_OK_AND_ASSIGN( - output, fs->OpenOutputStream( - path, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}}))); - ASSERT_OK(output->Write(expected.data(), expected.size())); - ASSERT_OK(output->Close()); - object = GcsClient().GetObjectMetadata(bucket, file_name); - ASSERT_TRUE(object.ok()) << "status=" << object.status(); - EXPECT_EQ(object->mutable_metadata()["bar"], "foo"); - // Defaults are overwritten and not merged. - EXPECT_FALSE(object->has_metadata("foo")); } -TEST_F(GcsIntegrationTest, OpenOutputStreamSmall) { - auto fs = GcsFileSystem::Make(TestGcsOptions()); - - const auto path = PreexistingBucketPath() + "test-write-object"; +TEST_F(AzuriteFileSystemTest, OpenOutputStreamLarge) { + const auto path = PreexistingContainerPath() + "test-write-object"; std::shared_ptr output; - ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); - const auto expected = std::string(kLoremIpsum); - ASSERT_OK(output->Write(expected.data(), expected.size())); - ASSERT_OK(output->Close()); - - // Verify we can read the object back. - std::shared_ptr input; - ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); - - std::array inbuf{}; - std::int64_t size; - ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); - - EXPECT_EQ(std::string(inbuf.data(), size), expected); -} - -TEST_F(GcsIntegrationTest, OpenOutputStreamLarge) { - auto fs = GcsFileSystem::Make(TestGcsOptions()); - - const auto path = PreexistingBucketPath() + "test-write-object"; - std::shared_ptr output; - ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); // These buffer sizes are intentionally not multiples of the upload quantum (256 KiB). std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; std::array buffers{ @@ -736,7 +731,7 @@ TEST_F(GcsIntegrationTest, OpenOutputStreamLarge) { // Verify we can read the object back. std::shared_ptr input; - ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); std::string contents; std::shared_ptr buffer; @@ -749,28 +744,23 @@ TEST_F(GcsIntegrationTest, OpenOutputStreamLarge) { EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]); } -TEST_F(GcsIntegrationTest, OpenOutputStreamClosed) { - auto fs = GcsFileSystem::Make(TestGcsOptions()); - - const auto path = internal::ConcatAbstractPath(PreexistingBucketName(), +TEST_F(AzuriteFileSystemTest, OpenOutputStreamClosed) { + const auto path = internal::ConcatAbstractPath(PreexistingContainerName(), "open-output-stream-closed.txt"); std::shared_ptr output; - ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); ASSERT_OK(output->Close()); ASSERT_RAISES(Invalid, output->Write(kLoremIpsum, std::strlen(kLoremIpsum))); ASSERT_RAISES(Invalid, output->Flush()); ASSERT_RAISES(Invalid, output->Tell()); } -TEST_F(GcsIntegrationTest, OpenOutputStreamUri) { - auto fs = GcsFileSystem::Make(TestGcsOptions()); - - const auto path = - internal::ConcatAbstractPath(PreexistingBucketName(), "open-output-stream-uri.txt"); - ASSERT_RAISES(Invalid, fs->OpenInputStream("gs://" + path)); +TEST_F(AzuriteFileSystemTest, OpenOutputStreamUri) { + const auto path = internal::ConcatAbstractPath(PreexistingContainerName(), + "open-output-stream-uri.txt"); + ASSERT_RAISES(Invalid, fs_->OpenInputStream("abfs://" + path)); } - TEST_F(AzuriteFileSystemTest, OpenInputFileMixedReadVsReadAt) { // Create a file large enough to make the random access tests non-trivial. auto constexpr kLineWidth = 100; From bbaf1385a5ef565713ad45c89aa19acbea66c451 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 15 Nov 2023 09:02:47 +0000 Subject: [PATCH 05/26] Paste in OpenOutputStream and OpenAppendStream from #12914 --- cpp/src/arrow/filesystem/azurefs.cc | 124 +++++++++++++++++++++++++++- 1 file changed, 122 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index dfd00e5a91a99..9d50e1c47a8ae 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -24,7 +24,7 @@ #include "arrow/buffer.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/util_internal.h" -#include "arrow/io/interfaces.h" +#include "arrow/io/util_internal.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" #include "arrow/util/formatting.h" @@ -868,6 +868,126 @@ class AzureFileSystem::Impl { return Status::OK(); } + + Result> OpenOutputStream( + const std::string& s, const std::shared_ptr& metadata, + AzureBlobFileSystem* fs) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + + if (path.empty() || path.path_to_file.empty()) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::string endpoint_url = dfs_endpoint_url_; + if (!is_hierarchical_namespace_enabled_) { + if (path.path_to_file_parts.size() > 1) { + return Status::IOError( + "Invalid path provided," + " hierarchical namespace not enabled"); + } + endpoint_url = blob_endpoint_url_; + } + ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + path.full_path)); + if (response) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::shared_ptr file_client; + ARROW_ASSIGN_OR_RAISE( + file_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + std::shared_ptr blob_client; + ARROW_ASSIGN_OR_RAISE( + blob_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + if (path.has_parent()) { + AzurePath parent_path = path.parent(); + if (parent_path.path_to_file.empty()) { + ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } else { + ARROW_ASSIGN_OR_RAISE(response, + DirExists(dfs_endpoint_url_ + parent_path.full_path)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } + } + auto ptr = std::make_shared(file_client, blob_client, + is_hierarchical_namespace_enabled_, + fs->io_context(), path, metadata); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Result> OpenAppendStream( + const std::string& s, const std::shared_ptr& metadata, + AzureBlobFileSystem* fs) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + + if (path.empty() || path.path_to_file.empty()) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::string endpoint_url = dfs_endpoint_url_; + if (!is_hierarchical_namespace_enabled_) { + if (path.path_to_file_parts.size() > 1) { + return Status::IOError( + "Invalid Azure Blob Storage path provided," + " hierarchical namespace not enabled in storage account"); + } + endpoint_url = blob_endpoint_url_; + } + ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + path.full_path)); + if (response) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::shared_ptr path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + std::shared_ptr file_client; + ARROW_ASSIGN_OR_RAISE( + file_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + std::shared_ptr blob_client; + ARROW_ASSIGN_OR_RAISE( + blob_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + if (path.has_parent()) { + AzurePath parent_path = path.parent(); + if (parent_path.path_to_file.empty()) { + ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } else { + ARROW_ASSIGN_OR_RAISE(response, + DirExists(dfs_endpoint_url_ + parent_path.full_path)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } + } + auto ptr = std::make_shared(path_client, file_client, blob_client, + is_hierarchical_namespace_enabled_, + fs->io_context(), path, metadata); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } }; const AzureOptions& AzureFileSystem::options() const { return impl_->options(); } @@ -949,7 +1069,7 @@ Result> AzureFileSystem::OpenInputFile( Result> AzureFileSystem::OpenOutputStream( const std::string& path, const std::shared_ptr& metadata) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + // return } Result> AzureFileSystem::OpenAppendStream( From 2f27fbb64e92b65ad9a0944fbb619636edefce66 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 16 Nov 2023 22:50:15 +0000 Subject: [PATCH 06/26] Tests pass --- cpp/src/arrow/filesystem/azurefs.cc | 210 ++++++++++++---------------- 1 file changed, 88 insertions(+), 122 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 9d50e1c47a8ae..2bcaa26c8fd2e 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -465,11 +465,10 @@ class ObjectInputFile final : public io::RandomAccessFile { class ObjectAppendStream final : public io::OutputStream { public: - ObjectAppendStream(std::shared_ptr blob_client, - const bool is_hierarchical_namespace_enabled, + ObjectAppendStream(std::shared_ptr block_blob_client, const io::IOContext& io_context, const AzurePath& path, const std::shared_ptr& metadata) - : blob_client_(std::move(blob_client)), io_context_(io_context), path_(path) {} + : block_blob_client_(std::move(block_blob_client)), io_context_(io_context), path_(path) {} ~ObjectAppendStream() override { // For compliance with the rest of the IO stack, Close rather than Abort, @@ -484,7 +483,7 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } try { - auto properties = blob_client_->GetProperties(); + auto properties = block_blob_client_->GetProperties(); // TODO: Consider adding a check for whether its a directory. content_length_ = properties.Value.BlobSize; pos_ = content_length_; @@ -493,7 +492,7 @@ class ObjectAppendStream final : public io::OutputStream { std::string s = ""; try { - blob_client_->UploadFrom( + block_blob_client_->UploadFrom( const_cast(reinterpret_cast(s.data())), s.size()); } catch (const Azure::Storage::StorageException& exception) { return Status::IOError(exception.RawResponse->GetReasonPhrase()); @@ -507,7 +506,7 @@ class ObjectAppendStream final : public io::OutputStream { if (closed_) { return Status::OK(); } - blob_client_ = nullptr; + block_blob_client_ = nullptr; closed_ = true; return Status::OK(); } @@ -518,7 +517,7 @@ class ObjectAppendStream final : public io::OutputStream { if (closed_) { return Status::OK(); } - blob_client_ = nullptr; + block_blob_client_ = nullptr; closed_ = true; return Status::OK(); } @@ -547,7 +546,7 @@ class ObjectAppendStream final : public io::OutputStream { } try { auto append_data = static_cast((void*)data); - auto res = blob_client_->GetBlockList().Value; + auto res = block_blob_client_->GetBlockList().Value; auto size = res.CommittedBlocks.size(); std::string block_id; { @@ -563,13 +562,13 @@ class ObjectAppendStream final : public io::OutputStream { if (block_content.Length() == 0) { return Status::OK(); } - blob_client_->StageBlock(block_id, block_content); + block_blob_client_->StageBlock(block_id, block_content); std::vector block_ids; for (auto block : res.CommittedBlocks) { block_ids.push_back(block.Name); } block_ids.push_back(block_id); - blob_client_->CommitBlockList(block_ids); + block_blob_client_->CommitBlockList(block_ids); pos_ += nbytes; } catch (const Azure::Storage::StorageException& exception) { return Status::IOError(exception.RawResponse->GetReasonPhrase()); @@ -583,12 +582,12 @@ class ObjectAppendStream final : public io::OutputStream { return Status::Invalid("Operation on closed stream"); } try { - auto res = blob_client_->GetBlockList().Value; + auto res = block_blob_client_->GetBlockList().Value; std::vector block_ids; for (auto block : res.UncommittedBlocks) { block_ids.push_back(block.Name); } - blob_client_->CommitBlockList(block_ids); + block_blob_client_->CommitBlockList(block_ids); } catch (const Azure::Storage::StorageException& exception) { return Status::IOError(exception.RawResponse->GetReasonPhrase()); } @@ -596,7 +595,7 @@ class ObjectAppendStream final : public io::OutputStream { } protected: - std::shared_ptr blob_client_; + std::shared_ptr block_blob_client_; const io::IOContext io_context_; const AzurePath path_; @@ -869,125 +868,92 @@ class AzureFileSystem::Impl { return Status::OK(); } - Result> OpenOutputStream( + Result> OpenOutputStream( const std::string& s, const std::shared_ptr& metadata, - AzureBlobFileSystem* fs) { + AzureFileSystem* fs) { ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + // TODO: Ensure cheap checks which don't require a call to Azure are done. if (path.empty() || path.path_to_file.empty()) { return ::arrow::fs::internal::PathNotFound(path.full_path); } - std::string endpoint_url = dfs_endpoint_url_; - if (!is_hierarchical_namespace_enabled_) { - if (path.path_to_file_parts.size() > 1) { - return Status::IOError( - "Invalid path provided," - " hierarchical namespace not enabled"); - } - endpoint_url = blob_endpoint_url_; - } - ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + path.full_path)); - if (response) { - return ::arrow::fs::internal::PathNotFound(path.full_path); - } - std::shared_ptr file_client; - ARROW_ASSIGN_OR_RAISE( - file_client, - InitPathClient( - options_, endpoint_url + path.full_path, path.container, path.path_to_file)); - - std::shared_ptr blob_client; - ARROW_ASSIGN_OR_RAISE( - blob_client, - InitPathClient( - options_, endpoint_url + path.full_path, path.container, path.path_to_file)); - - if (path.has_parent()) { - AzurePath parent_path = path.parent(); - if (parent_path.path_to_file.empty()) { - ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); - if (!response) { - return Status::IOError("Cannot write to file '", path.full_path, - "': parent directory does not exist"); - } - } else { - ARROW_ASSIGN_OR_RAISE(response, - DirExists(dfs_endpoint_url_ + parent_path.full_path)); - if (!response) { - return Status::IOError("Cannot write to file '", path.full_path, - "': parent directory does not exist"); - } - } - } - auto ptr = std::make_shared(file_client, blob_client, - is_hierarchical_namespace_enabled_, - fs->io_context(), path, metadata); - RETURN_NOT_OK(ptr->Init()); - return ptr; - } - Result> OpenAppendStream( - const std::string& s, const std::shared_ptr& metadata, - AzureBlobFileSystem* fs) { - ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + auto block_blob_client = std::make_shared( + blob_service_client_->GetBlobContainerClient(path.container) + .GetBlockBlobClient(path.path_to_file)); - if (path.empty() || path.path_to_file.empty()) { - return ::arrow::fs::internal::PathNotFound(path.full_path); - } - std::string endpoint_url = dfs_endpoint_url_; - if (!is_hierarchical_namespace_enabled_) { - if (path.path_to_file_parts.size() > 1) { - return Status::IOError( - "Invalid Azure Blob Storage path provided," - " hierarchical namespace not enabled in storage account"); - } - endpoint_url = blob_endpoint_url_; - } - ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + path.full_path)); - if (response) { - return ::arrow::fs::internal::PathNotFound(path.full_path); - } - std::shared_ptr path_client; - ARROW_ASSIGN_OR_RAISE( - path_client, - InitPathClient( - options_, endpoint_url + path.full_path, path.container, path.path_to_file)); - - std::shared_ptr file_client; - ARROW_ASSIGN_OR_RAISE( - file_client, - InitPathClient( - options_, endpoint_url + path.full_path, path.container, path.path_to_file)); - - std::shared_ptr blob_client; - ARROW_ASSIGN_OR_RAISE( - blob_client, - InitPathClient( - options_, endpoint_url + path.full_path, path.container, path.path_to_file)); - - if (path.has_parent()) { - AzurePath parent_path = path.parent(); - if (parent_path.path_to_file.empty()) { - ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); - if (!response) { - return Status::IOError("Cannot write to file '", path.full_path, - "': parent directory does not exist"); - } - } else { - ARROW_ASSIGN_OR_RAISE(response, - DirExists(dfs_endpoint_url_ + parent_path.full_path)); - if (!response) { - return Status::IOError("Cannot write to file '", path.full_path, - "': parent directory does not exist"); - } - } - } - auto ptr = std::make_shared(path_client, file_client, blob_client, - is_hierarchical_namespace_enabled_, - fs->io_context(), path, metadata); + auto ptr = std::make_shared(block_blob_client, fs->io_context(), path, + metadata); RETURN_NOT_OK(ptr->Init()); return ptr; } + + // Result> OpenAppendStream( + // const std::string& s, const std::shared_ptr& metadata, + // AzureBlobFileSystem* fs) { + // ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + + // if (path.empty() || path.path_to_file.empty()) { + // return ::arrow::fs::internal::PathNotFound(path.full_path); + // } + // std::string endpoint_url = dfs_endpoint_url_; + // if (!is_hierarchical_namespace_enabled_) { + // if (path.path_to_file_parts.size() > 1) { + // return Status::IOError( + // "Invalid Azure Blob Storage path provided," + // " hierarchical namespace not enabled in storage account"); + // } + // endpoint_url = blob_endpoint_url_; + // } + // ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + + // path.full_path)); if (response) { + // return ::arrow::fs::internal::PathNotFound(path.full_path); + // } + // std::shared_ptr path_client; + // ARROW_ASSIGN_OR_RAISE( + // path_client, + // InitPathClient( + // options_, endpoint_url + path.full_path, path.container, + // path.path_to_file)); + + // std::shared_ptr file_client; + // ARROW_ASSIGN_OR_RAISE( + // file_client, + // InitPathClient( + // options_, endpoint_url + path.full_path, path.container, + // path.path_to_file)); + + // std::shared_ptr blob_client; + // ARROW_ASSIGN_OR_RAISE( + // blob_client, + // InitPathClient( + // options_, endpoint_url + path.full_path, path.container, + // path.path_to_file)); + + // if (path.has_parent()) { + // AzurePath parent_path = path.parent(); + // if (parent_path.path_to_file.empty()) { + // ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); + // if (!response) { + // return Status::IOError("Cannot write to file '", path.full_path, + // "': parent directory does not exist"); + // } + // } else { + // ARROW_ASSIGN_OR_RAISE(response, + // DirExists(dfs_endpoint_url_ + parent_path.full_path)); + // if (!response) { + // return Status::IOError("Cannot write to file '", path.full_path, + // "': parent directory does not exist"); + // } + // } + // } + // auto ptr = std::make_shared(path_client, file_client, + // blob_client, + // is_hierarchical_namespace_enabled_, + // fs->io_context(), path, metadata); + // RETURN_NOT_OK(ptr->Init()); + // return ptr; + // } }; const AzureOptions& AzureFileSystem::options() const { return impl_->options(); } @@ -1069,7 +1035,7 @@ Result> AzureFileSystem::OpenInputFile( Result> AzureFileSystem::OpenOutputStream( const std::string& path, const std::shared_ptr& metadata) { - // return + return impl_->OpenOutputStream(path, metadata, this); } Result> AzureFileSystem::OpenAppendStream( From bfeb57d7e822a55b9cf74379b739e0a59ac5e125 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Thu, 16 Nov 2023 23:46:35 +0000 Subject: [PATCH 07/26] Better error handling and tidy --- cpp/src/arrow/filesystem/azurefs.cc | 79 +++++++++++++++++------------ 1 file changed, 46 insertions(+), 33 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 2bcaa26c8fd2e..0ee5bda18037c 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -463,12 +463,33 @@ class ObjectInputFile final : public io::RandomAccessFile { std::shared_ptr metadata_; }; +Status CreateEmptyBlockBlob( + std::shared_ptr block_blob_client) { + std::string s = ""; + try { + block_blob_client->UploadFrom( + const_cast(reinterpret_cast(s.data())), s.size()); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "UploadFrom failed for '" + block_blob_client->GetUrl() + + "' with an unexpected Azure error. There is new existing blob at this " + "path " + "so ObjectAppendStream must create a new empty block blob.", + exception); + } + return Status::OK(); +} + class ObjectAppendStream final : public io::OutputStream { public: - ObjectAppendStream(std::shared_ptr block_blob_client, - const io::IOContext& io_context, const AzurePath& path, - const std::shared_ptr& metadata) - : block_blob_client_(std::move(block_blob_client)), io_context_(io_context), path_(path) {} + ObjectAppendStream( + std::shared_ptr block_blob_client, + const io::IOContext& io_context, const AzurePath& path, + const std::shared_ptr& metadata, int64_t size = kNoSize) + : block_blob_client_(std::move(block_blob_client)), + io_context_(io_context), + path_(path), + content_length_(size) {} ~ObjectAppendStream() override { // For compliance with the rest of the IO stack, Close rather than Abort, @@ -477,7 +498,6 @@ class ObjectAppendStream final : public io::OutputStream { } Status Init() { - closed_ = false; if (content_length_ != kNoSize) { DCHECK_GE(content_length_, 0); return Status::OK(); @@ -488,21 +508,24 @@ class ObjectAppendStream final : public io::OutputStream { content_length_ = properties.Value.BlobSize; pos_ = content_length_; } catch (const Azure::Storage::StorageException& exception) { - // new file - - std::string s = ""; - try { - block_blob_client_->UploadFrom( - const_cast(reinterpret_cast(s.data())), s.size()); - } catch (const Azure::Storage::StorageException& exception) { - return Status::IOError(exception.RawResponse->GetReasonPhrase()); + if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_)); + } else { + return internal::ExceptionToStatus( + "GetProperties failed for '" + block_blob_client_->GetUrl() + + "' with an unexpected Azure error. Can not initialise an " + "ObjectAppendStream without knowing whether a file already exists at " + "this path.", + exception); } content_length_ = 0; } return Status::OK(); } - Status Abort() override { + Status Abort() override { return Close(); } + + Status Close() override { if (closed_) { return Status::OK(); } @@ -511,23 +534,17 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } - // OutputStream interface + bool closed() const override { return closed_; } - Status Close() override { + Status CheckClosed(const char* action) const { if (closed_) { - return Status::OK(); + return Status::Invalid("Cannot ", action, " on closed stream."); } - block_blob_client_ = nullptr; - closed_ = true; return Status::OK(); } - bool closed() const override { return closed_; } - Result Tell() const override { - if (closed_) { - return Status::Invalid("Operation on closed stream"); - } + RETURN_NOT_OK(CheckClosed("tell")); return pos_; } @@ -541,9 +558,7 @@ class ObjectAppendStream final : public io::OutputStream { Status DoAppend(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { - if (closed_) { - return Status::Invalid("Operation on closed stream"); - } + RETURN_NOT_OK(CheckClosed("append")); try { auto append_data = static_cast((void*)data); auto res = block_blob_client_->GetBlockList().Value; @@ -578,9 +593,7 @@ class ObjectAppendStream final : public io::OutputStream { } Status Flush() override { - if (closed_) { - return Status::Invalid("Operation on closed stream"); - } + RETURN_NOT_OK(CheckClosed("flush")); try { auto res = block_blob_client_->GetBlockList().Value; std::vector block_ids; @@ -599,7 +612,7 @@ class ObjectAppendStream final : public io::OutputStream { const io::IOContext io_context_; const AzurePath path_; - bool closed_ = true; + bool closed_ = false; int64_t pos_ = 0; int64_t content_length_ = kNoSize; }; @@ -882,8 +895,8 @@ class AzureFileSystem::Impl { blob_service_client_->GetBlobContainerClient(path.container) .GetBlockBlobClient(path.path_to_file)); - auto ptr = std::make_shared(block_blob_client, fs->io_context(), path, - metadata); + auto ptr = std::make_shared(block_blob_client, fs->io_context(), + path, metadata); RETURN_NOT_OK(ptr->Init()); return ptr; } From d4c53b2d3af063f5ce420709c38d5c062d2c0cec Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Fri, 17 Nov 2023 00:08:07 +0000 Subject: [PATCH 08/26] Implement append and output --- cpp/src/arrow/filesystem/azurefs.cc | 88 +++++------------------------ 1 file changed, 14 insertions(+), 74 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 0ee5bda18037c..629f4f2722ff7 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -881,9 +881,9 @@ class AzureFileSystem::Impl { return Status::OK(); } - Result> OpenOutputStream( + Result> OpenAppendStream( const std::string& s, const std::shared_ptr& metadata, - AzureFileSystem* fs) { + const bool truncate, AzureFileSystem* fs) { ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); // TODO: Ensure cheap checks which don't require a call to Azure are done. @@ -895,78 +895,18 @@ class AzureFileSystem::Impl { blob_service_client_->GetBlobContainerClient(path.container) .GetBlockBlobClient(path.path_to_file)); - auto ptr = std::make_shared(block_blob_client, fs->io_context(), - path, metadata); + std::shared_ptr ptr; + if (truncate) { + RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client)); + ptr = std::make_shared(block_blob_client, fs->io_context(), + path, metadata, 0); + } else { + ptr = std::make_shared(block_blob_client, fs->io_context(), + path, metadata); + } RETURN_NOT_OK(ptr->Init()); return ptr; } - - // Result> OpenAppendStream( - // const std::string& s, const std::shared_ptr& metadata, - // AzureBlobFileSystem* fs) { - // ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); - - // if (path.empty() || path.path_to_file.empty()) { - // return ::arrow::fs::internal::PathNotFound(path.full_path); - // } - // std::string endpoint_url = dfs_endpoint_url_; - // if (!is_hierarchical_namespace_enabled_) { - // if (path.path_to_file_parts.size() > 1) { - // return Status::IOError( - // "Invalid Azure Blob Storage path provided," - // " hierarchical namespace not enabled in storage account"); - // } - // endpoint_url = blob_endpoint_url_; - // } - // ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + - // path.full_path)); if (response) { - // return ::arrow::fs::internal::PathNotFound(path.full_path); - // } - // std::shared_ptr path_client; - // ARROW_ASSIGN_OR_RAISE( - // path_client, - // InitPathClient( - // options_, endpoint_url + path.full_path, path.container, - // path.path_to_file)); - - // std::shared_ptr file_client; - // ARROW_ASSIGN_OR_RAISE( - // file_client, - // InitPathClient( - // options_, endpoint_url + path.full_path, path.container, - // path.path_to_file)); - - // std::shared_ptr blob_client; - // ARROW_ASSIGN_OR_RAISE( - // blob_client, - // InitPathClient( - // options_, endpoint_url + path.full_path, path.container, - // path.path_to_file)); - - // if (path.has_parent()) { - // AzurePath parent_path = path.parent(); - // if (parent_path.path_to_file.empty()) { - // ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); - // if (!response) { - // return Status::IOError("Cannot write to file '", path.full_path, - // "': parent directory does not exist"); - // } - // } else { - // ARROW_ASSIGN_OR_RAISE(response, - // DirExists(dfs_endpoint_url_ + parent_path.full_path)); - // if (!response) { - // return Status::IOError("Cannot write to file '", path.full_path, - // "': parent directory does not exist"); - // } - // } - // } - // auto ptr = std::make_shared(path_client, file_client, - // blob_client, - // is_hierarchical_namespace_enabled_, - // fs->io_context(), path, metadata); - // RETURN_NOT_OK(ptr->Init()); - // return ptr; - // } }; const AzureOptions& AzureFileSystem::options() const { return impl_->options(); } @@ -1048,12 +988,12 @@ Result> AzureFileSystem::OpenInputFile( Result> AzureFileSystem::OpenOutputStream( const std::string& path, const std::shared_ptr& metadata) { - return impl_->OpenOutputStream(path, metadata, this); + return impl_->OpenAppendStream(path, metadata, true, this); } Result> AzureFileSystem::OpenAppendStream( - const std::string&, const std::shared_ptr&) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + const std::string& path, const std::shared_ptr& metadata) { + return impl_->OpenAppendStream(path, metadata, false, this); } Result> AzureFileSystem::Make( From a1a8c15c94bd2f8aadaf3177bc31ed8f01935282 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sat, 18 Nov 2023 16:00:51 +0000 Subject: [PATCH 09/26] Fix rebase --- cpp/src/arrow/filesystem/azurefs.cc | 33 +++++++++++++++-------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 629f4f2722ff7..04ad6303f2179 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -484,11 +484,11 @@ class ObjectAppendStream final : public io::OutputStream { public: ObjectAppendStream( std::shared_ptr block_blob_client, - const io::IOContext& io_context, const AzurePath& path, + const io::IOContext& io_context, const AzureLocation& location, const std::shared_ptr& metadata, int64_t size = kNoSize) : block_blob_client_(std::move(block_blob_client)), io_context_(io_context), - path_(path), + location_(location), content_length_(size) {} ~ObjectAppendStream() override { @@ -610,7 +610,7 @@ class ObjectAppendStream final : public io::OutputStream { protected: std::shared_ptr block_blob_client_; const io::IOContext io_context_; - const AzurePath path_; + const AzureLocation location_; bool closed_ = false; int64_t pos_ = 0; @@ -880,29 +880,28 @@ class AzureFileSystem::Impl { return Status::OK(); } - - Result> OpenAppendStream( - const std::string& s, const std::shared_ptr& metadata, - const bool truncate, AzureFileSystem* fs) { - ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + Result> OpenAppendStream( + const AzureLocation& location, + const std::shared_ptr& metadata, const bool truncate, + AzureFileSystem* fs) { // TODO: Ensure cheap checks which don't require a call to Azure are done. - if (path.empty() || path.path_to_file.empty()) { - return ::arrow::fs::internal::PathNotFound(path.full_path); + if (location.empty() || location.path.empty()) { + return ::arrow::fs::internal::PathNotFound(location.path); } auto block_blob_client = std::make_shared( - blob_service_client_->GetBlobContainerClient(path.container) - .GetBlockBlobClient(path.path_to_file)); + blob_service_client_->GetBlobContainerClient(location.container) + .GetBlockBlobClient(location.path)); std::shared_ptr ptr; if (truncate) { RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client)); ptr = std::make_shared(block_blob_client, fs->io_context(), - path, metadata, 0); + location, metadata, 0); } else { ptr = std::make_shared(block_blob_client, fs->io_context(), - path, metadata); + location, metadata); } RETURN_NOT_OK(ptr->Init()); return ptr; @@ -988,12 +987,14 @@ Result> AzureFileSystem::OpenInputFile( Result> AzureFileSystem::OpenOutputStream( const std::string& path, const std::shared_ptr& metadata) { - return impl_->OpenAppendStream(path, metadata, true, this); + ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); + return impl_->OpenAppendStream(location, metadata, true, this); } Result> AzureFileSystem::OpenAppendStream( const std::string& path, const std::shared_ptr& metadata) { - return impl_->OpenAppendStream(path, metadata, false, this); + ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); + return impl_->OpenAppendStream(location, metadata, false, this); } Result> AzureFileSystem::Make( From 0b0a2ee406bb10a0c71d55f949f3a1596aae4996 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sat, 18 Nov 2023 16:31:27 +0000 Subject: [PATCH 10/26] Add tests to distinguish OpenAppendStream and OpenOutputStream --- cpp/src/arrow/filesystem/azurefs_test.cc | 58 ++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index e8d1db1129051..54c930fc0459e 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -744,6 +744,64 @@ TEST_F(AzuriteFileSystemTest, OpenOutputStreamLarge) { EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]); } +TEST_F(AzuriteFileSystemTest, OpenOutputStreamTruncatesExistingFile) { + const auto path = PreexistingContainerPath() + "test-write-object"; + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); + const std::string expected0 = "Existing blob content"; + ASSERT_OK(output->Write(expected0.data(), expected0.size())); + ASSERT_OK(output->Close()); + + // Check that the initial content has been written - if not this test is not achieving + // what it's meant to. + std::shared_ptr input; + ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + + std::array inbuf{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(std::string(inbuf.data(), size), expected0); + + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); + const auto expected1 = std::string(kLoremIpsum); + ASSERT_OK(output->Write(expected1.data(), expected1.size())); + ASSERT_OK(output->Close()); + + // Verify that the initial content has been overwritten. + ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(std::string(inbuf.data(), size), expected1); +} + +TEST_F(AzuriteFileSystemTest, OpenAppendStreamDoesNotTruncateExistingFile) { + const auto path = PreexistingContainerPath() + "test-write-object"; + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); + const std::string expected0 = "Existing blob content"; + ASSERT_OK(output->Write(expected0.data(), expected0.size())); + ASSERT_OK(output->Close()); + + // Check that the initial content has been written - if not this test is not achieving + // what it's meant to. + std::shared_ptr input; + ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + + std::array inbuf{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(std::string(inbuf.data(), size), expected0); + + ASSERT_OK_AND_ASSIGN(output, fs_->OpenAppendStream(path, {})); + const auto expected1 = std::string(kLoremIpsum); + ASSERT_OK(output->Write(expected1.data(), expected1.size())); + ASSERT_OK(output->Close()); + + // Verify that the initial content has been overwritten. + ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(std::string(inbuf.data(), size), expected0 + expected1); +} + TEST_F(AzuriteFileSystemTest, OpenOutputStreamClosed) { const auto path = internal::ConcatAbstractPath(PreexistingContainerName(), "open-output-stream-closed.txt"); From c69f188d74b6bc9ef0f6e0214ffa52a11b94d068 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sat, 18 Nov 2023 17:07:52 +0000 Subject: [PATCH 11/26] More precise error handling on calls to Azure blob storage --- cpp/src/arrow/filesystem/azurefs.cc | 101 ++++++++++++++++++---------- 1 file changed, 66 insertions(+), 35 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 04ad6303f2179..6c4be25b7f635 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -480,6 +480,35 @@ Status CreateEmptyBlockBlob( return Status::OK(); } +Result GetBlockList( + std::shared_ptr block_blob_client) { + try { + // TODO: Can we avoid this call if we know that the file is empty? + return block_blob_client->GetBlockList().Value; + } catch (Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "GetBlockList failed for '" + block_blob_client->GetUrl() + + "' with an unexpected Azure error. Cannot write to a file without first " + "fetching the existing block list.", + exception); + } +} + +Status CommitBlockList( + std::shared_ptr block_blob_client, + std::vector block_ids) { + try { + block_blob_client->CommitBlockList(block_ids); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "CommitBlockList failed for '" + block_blob_client->GetUrl() + + "' with an unexpected Azure error. Committing the block list is " + "fundamental to streaming writes to blob storage.", + exception); + } + return Status::OK(); +} + class ObjectAppendStream final : public io::OutputStream { public: ObjectAppendStream( @@ -559,51 +588,53 @@ class ObjectAppendStream final : public io::OutputStream { Status DoAppend(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { RETURN_NOT_OK(CheckClosed("append")); + auto append_data = static_cast((void*)data); + auto block_content = Azure::Core::IO::MemoryBodyStream( + append_data, strlen(reinterpret_cast(append_data))); + if (block_content.Length() == 0) { + return Status::OK(); + } + + ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_)); + auto size = block_list.CommittedBlocks.size(); + std::string new_block_id; + new_block_id = std::to_string(size + 1); + size_t n = 8; + int precision = n - std::min(n, new_block_id.size()); + new_block_id.insert(0, precision, '0'); + new_block_id = Azure::Core::Convert::Base64Encode( + std::vector(new_block_id.begin(), new_block_id.end())); + + std::vector block_ids; + for (auto block : block_list.CommittedBlocks) { + block_ids.push_back(block.Name); + } try { - auto append_data = static_cast((void*)data); - auto res = block_blob_client_->GetBlockList().Value; - auto size = res.CommittedBlocks.size(); - std::string block_id; - { - block_id = std::to_string(size + 1); - size_t n = 8; - int precision = n - std::min(n, block_id.size()); - block_id.insert(0, precision, '0'); - } - block_id = Azure::Core::Convert::Base64Encode( - std::vector(block_id.begin(), block_id.end())); - auto block_content = Azure::Core::IO::MemoryBodyStream( - append_data, strlen(reinterpret_cast(append_data))); - if (block_content.Length() == 0) { - return Status::OK(); - } - block_blob_client_->StageBlock(block_id, block_content); - std::vector block_ids; - for (auto block : res.CommittedBlocks) { - block_ids.push_back(block.Name); - } - block_ids.push_back(block_id); - block_blob_client_->CommitBlockList(block_ids); - pos_ += nbytes; + block_blob_client_->StageBlock(new_block_id, block_content); } catch (const Azure::Storage::StorageException& exception) { - return Status::IOError(exception.RawResponse->GetReasonPhrase()); + return internal::ExceptionToStatus( + "StageBlock failed for '" + block_blob_client_->GetUrl() + "' new_block_id: '" + + new_block_id + + "' with an unexpected Azure error. Staging new blocks is fundamental to " + "streaming writes to blob storage.", + exception); } + block_ids.push_back(new_block_id); + // TODO: Do we really want to commit the block list on every append? + RETURN_NOT_OK(CommitBlockList(block_blob_client_, block_ids)); + pos_ += nbytes; content_length_ += nbytes; return Status::OK(); } Status Flush() override { RETURN_NOT_OK(CheckClosed("flush")); - try { - auto res = block_blob_client_->GetBlockList().Value; - std::vector block_ids; - for (auto block : res.UncommittedBlocks) { - block_ids.push_back(block.Name); - } - block_blob_client_->CommitBlockList(block_ids); - } catch (const Azure::Storage::StorageException& exception) { - return Status::IOError(exception.RawResponse->GetReasonPhrase()); + ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_)); + std::vector block_ids; + for (auto block : block_list.UncommittedBlocks) { + block_ids.push_back(block.Name); } + RETURN_NOT_OK(CommitBlockList(block_blob_client_, block_ids)); return Status::OK(); } From 5a91113614f787636965a847abbed258560fbefb Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sat, 18 Nov 2023 23:44:36 +0000 Subject: [PATCH 12/26] Avoid unnecessary extra block list fetching and committing --- cpp/src/arrow/filesystem/azurefs.cc | 71 +++++++++++++++-------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 6c4be25b7f635..5bd53b2fe9fb8 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -483,7 +483,6 @@ Status CreateEmptyBlockBlob( Result GetBlockList( std::shared_ptr block_blob_client) { try { - // TODO: Can we avoid this call if we know that the file is empty? return block_blob_client->GetBlockList().Value; } catch (Azure::Storage::StorageException& exception) { return internal::ExceptionToStatus( @@ -529,35 +528,49 @@ class ObjectAppendStream final : public io::OutputStream { Status Init() { if (content_length_ != kNoSize) { DCHECK_GE(content_length_, 0); - return Status::OK(); + } else { + try { + auto properties = block_blob_client_->GetProperties(); + // TODO: Consider adding a check for whether its a directory. + content_length_ = properties.Value.BlobSize; + pos_ = content_length_; + } catch (const Azure::Storage::StorageException& exception) { + if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_)); + } else { + return internal::ExceptionToStatus( + "GetProperties failed for '" + block_blob_client_->GetUrl() + + "' with an unexpected Azure error. Can not initialise an " + "ObjectAppendStream without knowing whether a file already exists at " + "this path.", + exception); + } + content_length_ = 0; + } } - try { - auto properties = block_blob_client_->GetProperties(); - // TODO: Consider adding a check for whether its a directory. - content_length_ = properties.Value.BlobSize; - pos_ = content_length_; - } catch (const Azure::Storage::StorageException& exception) { - if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { - RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_)); - } else { - return internal::ExceptionToStatus( - "GetProperties failed for '" + block_blob_client_->GetUrl() + - "' with an unexpected Azure error. Can not initialise an " - "ObjectAppendStream without knowing whether a file already exists at " - "this path.", - exception); + if (content_length_ > 0) { + ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_)); + for (auto block : block_list.CommittedBlocks) { + block_ids_.push_back(block.Name); } - content_length_ = 0; } return Status::OK(); } - Status Abort() override { return Close(); } + Status Abort() override { + if (closed_) { + return Status::OK(); + } + block_blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } Status Close() override { if (closed_) { return Status::OK(); } + RETURN_NOT_OK(Flush()); block_blob_client_ = nullptr; closed_ = true; return Status::OK(); @@ -595,8 +608,7 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } - ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_)); - auto size = block_list.CommittedBlocks.size(); + auto size = block_ids_.size(); std::string new_block_id; new_block_id = std::to_string(size + 1); size_t n = 8; @@ -605,10 +617,6 @@ class ObjectAppendStream final : public io::OutputStream { new_block_id = Azure::Core::Convert::Base64Encode( std::vector(new_block_id.begin(), new_block_id.end())); - std::vector block_ids; - for (auto block : block_list.CommittedBlocks) { - block_ids.push_back(block.Name); - } try { block_blob_client_->StageBlock(new_block_id, block_content); } catch (const Azure::Storage::StorageException& exception) { @@ -619,9 +627,7 @@ class ObjectAppendStream final : public io::OutputStream { "streaming writes to blob storage.", exception); } - block_ids.push_back(new_block_id); - // TODO: Do we really want to commit the block list on every append? - RETURN_NOT_OK(CommitBlockList(block_blob_client_, block_ids)); + block_ids_.push_back(new_block_id); pos_ += nbytes; content_length_ += nbytes; return Status::OK(); @@ -629,13 +635,7 @@ class ObjectAppendStream final : public io::OutputStream { Status Flush() override { RETURN_NOT_OK(CheckClosed("flush")); - ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_)); - std::vector block_ids; - for (auto block : block_list.UncommittedBlocks) { - block_ids.push_back(block.Name); - } - RETURN_NOT_OK(CommitBlockList(block_blob_client_, block_ids)); - return Status::OK(); + return CommitBlockList(block_blob_client_, block_ids_); } protected: @@ -646,6 +646,7 @@ class ObjectAppendStream final : public io::OutputStream { bool closed_ = false; int64_t pos_ = 0; int64_t content_length_ = kNoSize; + std::vector block_ids_; }; } // namespace From 1c4300ba1bfd516b676f79ab33060ce2e3f2146e Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 19 Nov 2023 00:29:49 +0000 Subject: [PATCH 13/26] Adjust block_ids and add some comments --- cpp/src/arrow/filesystem/azurefs.cc | 22 +++++++++++++++++----- cpp/src/arrow/filesystem/azurefs_test.cc | 3 ++- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 5bd53b2fe9fb8..fab37c65461b7 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -497,6 +497,10 @@ Status CommitBlockList( std::shared_ptr block_blob_client, std::vector block_ids) { try { + // CommitBlockList puts all block_ids in the latest element. That means in the case of + // overlapping block_ids the newly staged block ids will always replace the + // previously committed blocks. + // https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body block_blob_client->CommitBlockList(block_ids); } catch (const Azure::Storage::StorageException& exception) { return internal::ExceptionToStatus( @@ -609,11 +613,19 @@ class ObjectAppendStream final : public io::OutputStream { } auto size = block_ids_.size(); - std::string new_block_id; - new_block_id = std::to_string(size + 1); - size_t n = 8; - int precision = n - std::min(n, new_block_id.size()); - new_block_id.insert(0, precision, '0'); + + // New block ids must always be distinct from the existing block ids. Otherwise we + // will accidentally replace the content of existing blocks, causing corruption. + // We will use monotonically increasing integers. + std::string new_block_id = std::to_string(size); + + // Pad to 5 digits, because Azure allows a maximum of 50,000 blocks. + const size_t target_number_of_digits = 5; + int required_padding_digits = + target_number_of_digits - std::min(target_number_of_digits, new_block_id.size()); + new_block_id.insert(0, required_padding_digits, '0'); + new_block_id += "-arrow"; // Add a suffix to reduce risk of block_id collisions with + // blocks created by other applications. new_block_id = Azure::Core::Convert::Base64Encode( std::vector(new_block_id.begin(), new_block_id.end())); diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 54c930fc0459e..ae9a2812466de 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -796,7 +796,8 @@ TEST_F(AzuriteFileSystemTest, OpenAppendStreamDoesNotTruncateExistingFile) { ASSERT_OK(output->Write(expected1.data(), expected1.size())); ASSERT_OK(output->Close()); - // Verify that the initial content has been overwritten. + // Verify that the initial content has not been overwritten and that the block from + // the other client was not committed. ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); EXPECT_EQ(std::string(inbuf.data(), size), expected0 + expected1); From a62b95e3d6d87dde79235fa1aef825ea04eac2ff Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 19 Nov 2023 01:23:21 +0000 Subject: [PATCH 14/26] Add test for writing blob metadata --- cpp/src/arrow/filesystem/azurefs_test.cc | 81 +++++++++++------------- 1 file changed, 37 insertions(+), 44 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index ae9a2812466de..67a2dfd6e6352 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -647,49 +647,43 @@ TEST_F(AzuriteFileSystemTest, OpenInputStreamClosed) { ASSERT_RAISES(Invalid, stream->Tell()); } -// TEST_F(AzuriteFileSystemTest, TestWriteWithDefaults) { -// auto options = TestGcsOptions(); -// options.default_bucket_location = "utopia"; -// options.default_metadata = arrow::key_value_metadata({{"foo", "bar"}}); -// auto fs = GcsFileSystem::Make(options); -// std::string bucket = "new_bucket_with_default_location"; -// auto file_name = "object_with_defaults"; -// ASSERT_OK(fs->CreateDir(bucket, /*recursive=*/false)); -// const auto path = bucket + "/" + file_name; -// std::shared_ptr output; -// ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, /*metadata=*/{})); -// const auto expected = std::string(kLoremIpsum); -// ASSERT_OK(output->Write(expected.data(), expected.size())); -// ASSERT_OK(output->Close()); - -// // Verify we can read the object back. -// std::shared_ptr input; -// ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); - -// std::array inbuf{}; -// std::int64_t size; -// ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); - -// EXPECT_EQ(std::string(inbuf.data(), size), expected); -// auto object = GcsClient().GetObjectMetadata(bucket, file_name); -// ASSERT_TRUE(object.ok()) << "status=" << object.status(); -// EXPECT_EQ(object->mutable_metadata()["foo"], "bar"); -// auto bucket_info = GcsClient().GetBucketMetadata(bucket); -// ASSERT_TRUE(bucket_info.ok()) << "status=" << object.status(); -// EXPECT_EQ(bucket_info->location(), "utopia"); - -// // Check that explicit metadata overrides the defaults. -// ASSERT_OK_AND_ASSIGN( -// output, fs->OpenOutputStream( -// path, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}}))); -// ASSERT_OK(output->Write(expected.data(), expected.size())); -// ASSERT_OK(output->Close()); -// object = GcsClient().GetObjectMetadata(bucket, file_name); -// ASSERT_TRUE(object.ok()) << "status=" << object.status(); -// EXPECT_EQ(object->mutable_metadata()["bar"], "foo"); -// // Defaults are overwritten and not merged. -// EXPECT_FALSE(object->has_metadata("foo")); -// } +TEST_F(AzuriteFileSystemTest, TestWriteMetadata) { + options_.default_metadata = arrow::key_value_metadata({{"foo", "bar"}}); + + std::shared_ptr fs_with_defaults; + ASSERT_OK_AND_ASSIGN(fs_with_defaults, AzureFileSystem::Make(options_)); + std::string path = "object_with_defaults"; + auto location = PreexistingContainerPath() + path; + std::shared_ptr output; + ASSERT_OK_AND_ASSIGN(output, + fs_with_defaults->OpenOutputStream(location, /*metadata=*/{})); + const auto expected = std::string(kLoremIpsum); + ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Close()); + + // Verify the metadata has been set. + auto blob_metadata = + blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path) + .GetProperties() + .Value.Metadata; + EXPECT_EQ(blob_metadata["foo"], "bar"); + + // Check that explicit metadata overrides the defaults. + ASSERT_OK_AND_ASSIGN( + output, fs_with_defaults->OpenOutputStream( + location, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}}))); + ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Close()); + blob_metadata = + blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path) + .GetProperties() + .Value.Metadata; + EXPECT_EQ(blob_metadata["bar"], "foo"); + // Defaults are overwritten and not merged. + EXPECT_EQ(blob_metadata.find("foo"), blob_metadata.end()); +} TEST_F(AzuriteFileSystemTest, OpenOutputStreamSmall) { const auto path = PreexistingContainerPath() + "test-write-object"; @@ -714,7 +708,6 @@ TEST_F(AzuriteFileSystemTest, OpenOutputStreamLarge) { const auto path = PreexistingContainerPath() + "test-write-object"; std::shared_ptr output; ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); - // These buffer sizes are intentionally not multiples of the upload quantum (256 KiB). std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; std::array buffers{ std::string(sizes[0], 'A'), From 28ec1edc041335e16575ceb2c37fdc36c4db7c0f Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 19 Nov 2023 01:34:25 +0000 Subject: [PATCH 15/26] Implement metadata writes --- cpp/src/arrow/filesystem/azurefs.cc | 36 ++++++++++++++++++++++------- cpp/src/arrow/filesystem/azurefs.h | 5 ++++ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index fab37c65461b7..200114d89c5c4 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -44,7 +44,8 @@ AzureOptions::AzureOptions() {} bool AzureOptions::Equals(const AzureOptions& other) const { return (account_dfs_url == other.account_dfs_url && account_blob_url == other.account_blob_url && - credentials_kind == other.credentials_kind); + credentials_kind == other.credentials_kind && + default_metadata == other.default_metadata); } Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_name, @@ -493,15 +494,26 @@ Result GetBlockList( } } +Azure::Storage::Metadata ArrowMetadataToAzureMetadata( + const std::shared_ptr& arrow_metadata) { + Azure::Storage::Metadata azure_metadata; + for (auto key_value : arrow_metadata->sorted_pairs()) { + azure_metadata[key_value.first] = key_value.second; + } + return azure_metadata; +} + Status CommitBlockList( std::shared_ptr block_blob_client, - std::vector block_ids) { + std::vector block_ids, const Azure::Storage::Metadata& metadata) { + Azure::Storage::Blobs::CommitBlockListOptions options; + options.Metadata = metadata; try { // CommitBlockList puts all block_ids in the latest element. That means in the case of // overlapping block_ids the newly staged block ids will always replace the // previously committed blocks. // https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body - block_blob_client->CommitBlockList(block_ids); + block_blob_client->CommitBlockList(block_ids, options); } catch (const Azure::Storage::StorageException& exception) { return internal::ExceptionToStatus( "CommitBlockList failed for '" + block_blob_client->GetUrl() + @@ -517,11 +529,18 @@ class ObjectAppendStream final : public io::OutputStream { ObjectAppendStream( std::shared_ptr block_blob_client, const io::IOContext& io_context, const AzureLocation& location, - const std::shared_ptr& metadata, int64_t size = kNoSize) + const std::shared_ptr& metadata, + const AzureOptions& options, int64_t size = kNoSize) : block_blob_client_(std::move(block_blob_client)), io_context_(io_context), location_(location), - content_length_(size) {} + content_length_(size) { + if (metadata && metadata->size() != 0) { + metadata_ = ArrowMetadataToAzureMetadata(metadata); + } else if (options.default_metadata && options.default_metadata->size() != 0) { + metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata); + } + } ~ObjectAppendStream() override { // For compliance with the rest of the IO stack, Close rather than Abort, @@ -647,7 +666,7 @@ class ObjectAppendStream final : public io::OutputStream { Status Flush() override { RETURN_NOT_OK(CheckClosed("flush")); - return CommitBlockList(block_blob_client_, block_ids_); + return CommitBlockList(block_blob_client_, block_ids_, metadata_); } protected: @@ -659,6 +678,7 @@ class ObjectAppendStream final : public io::OutputStream { int64_t pos_ = 0; int64_t content_length_ = kNoSize; std::vector block_ids_; + Azure::Storage::Metadata metadata_; }; } // namespace @@ -942,10 +962,10 @@ class AzureFileSystem::Impl { if (truncate) { RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client)); ptr = std::make_shared(block_blob_client, fs->io_context(), - location, metadata, 0); + location, metadata, options_, 0); } else { ptr = std::make_shared(block_blob_client, fs->io_context(), - location, metadata); + location, metadata, options_); } RETURN_NOT_OK(ptr->Init()); return ptr; diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 1f7047ff94c56..9f980ee8baae0 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -77,6 +77,11 @@ struct ARROW_EXPORT AzureOptions { std::shared_ptr service_principle_credentials_provider; + /// \brief Default metadata for OpenOutputStream. + /// + /// This will be ignored if non-empty metadata is passed to OpenOutputStream. + std::shared_ptr default_metadata; + AzureOptions(); Status ConfigureAccountKeyCredentials(const std::string& account_name, From 2800dea0e4c9bc1feace5d47a98434a64ef76ae3 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 19 Nov 2023 01:42:02 +0000 Subject: [PATCH 16/26] Add simple sanity checks on the location --- cpp/src/arrow/filesystem/azurefs.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 200114d89c5c4..a58293cc9e065 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -554,7 +554,6 @@ class ObjectAppendStream final : public io::OutputStream { } else { try { auto properties = block_blob_client_->GetProperties(); - // TODO: Consider adding a check for whether its a directory. content_length_ = properties.Value.BlobSize; pos_ = content_length_; } catch (const Azure::Storage::StorageException& exception) { @@ -949,7 +948,8 @@ class AzureFileSystem::Impl { const AzureLocation& location, const std::shared_ptr& metadata, const bool truncate, AzureFileSystem* fs) { - // TODO: Ensure cheap checks which don't require a call to Azure are done. + RETURN_NOT_OK(ValidateFileLocation(location)); + ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path)); if (location.empty() || location.path.empty()) { return ::arrow::fs::internal::PathNotFound(location.path); } From 9a98a0f6bbcbec7e63fd4b33758a5fd9cf0ec6d5 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 19 Nov 2023 02:21:55 +0000 Subject: [PATCH 17/26] Tidy --- cpp/src/arrow/filesystem/azurefs.cc | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index a58293cc9e065..33cfbf5f5cd75 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -473,9 +473,9 @@ Status CreateEmptyBlockBlob( } catch (const Azure::Storage::StorageException& exception) { return internal::ExceptionToStatus( "UploadFrom failed for '" + block_blob_client->GetUrl() + - "' with an unexpected Azure error. There is new existing blob at this " - "path " - "so ObjectAppendStream must create a new empty block blob.", + "' with an unexpected Azure error. There is no existing blob at this " + "location or the existing blob must be replaced so ObjectAppendStream must " + "create a new empty block blob.", exception); } return Status::OK(); @@ -505,7 +505,7 @@ Azure::Storage::Metadata ArrowMetadataToAzureMetadata( Status CommitBlockList( std::shared_ptr block_blob_client, - std::vector block_ids, const Azure::Storage::Metadata& metadata) { + const std::vector& block_ids, const Azure::Storage::Metadata& metadata) { Azure::Storage::Blobs::CommitBlockListOptions options; options.Metadata = metadata; try { @@ -517,8 +517,8 @@ Status CommitBlockList( } catch (const Azure::Storage::StorageException& exception) { return internal::ExceptionToStatus( "CommitBlockList failed for '" + block_blob_client->GetUrl() + - "' with an unexpected Azure error. Committing the block list is " - "fundamental to streaming writes to blob storage.", + "' with an unexpected Azure error. Committing is required to flush an " + "output/append stream.", exception); } return Status::OK(); @@ -551,6 +551,7 @@ class ObjectAppendStream final : public io::OutputStream { Status Init() { if (content_length_ != kNoSize) { DCHECK_GE(content_length_, 0); + pos_ = content_length_; } else { try { auto properties = block_blob_client_->GetProperties(); @@ -564,7 +565,7 @@ class ObjectAppendStream final : public io::OutputStream { "GetProperties failed for '" + block_blob_client_->GetUrl() + "' with an unexpected Azure error. Can not initialise an " "ObjectAppendStream without knowing whether a file already exists at " - "this path.", + "this path, and if it exists, its size.", exception); } content_length_ = 0; @@ -623,7 +624,7 @@ class ObjectAppendStream final : public io::OutputStream { Status DoAppend(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { RETURN_NOT_OK(CheckClosed("append")); - auto append_data = static_cast((void*)data); + auto append_data = reinterpret_cast((void*)data); auto block_content = Azure::Core::IO::MemoryBodyStream( append_data, strlen(reinterpret_cast(append_data))); if (block_content.Length() == 0) { @@ -950,9 +951,6 @@ class AzureFileSystem::Impl { AzureFileSystem* fs) { RETURN_NOT_OK(ValidateFileLocation(location)); ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path)); - if (location.empty() || location.path.empty()) { - return ::arrow::fs::internal::PathNotFound(location.path); - } auto block_blob_client = std::make_shared( blob_service_client_->GetBlobContainerClient(location.container) From d86c3742ed3dfd62328094367d22df0b9fb91be0 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 19 Nov 2023 12:38:24 +0000 Subject: [PATCH 18/26] PR comments 1 --- cpp/src/arrow/filesystem/azurefs.cc | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 33cfbf5f5cd75..39a37018321a9 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -466,10 +466,8 @@ class ObjectInputFile final : public io::RandomAccessFile { Status CreateEmptyBlockBlob( std::shared_ptr block_blob_client) { - std::string s = ""; try { - block_blob_client->UploadFrom( - const_cast(reinterpret_cast(s.data())), s.size()); + block_blob_client->UploadFrom(nullptr, 0); } catch (const Azure::Storage::StorageException& exception) { return internal::ExceptionToStatus( "UploadFrom failed for '" + block_blob_client->GetUrl() + @@ -624,23 +622,22 @@ class ObjectAppendStream final : public io::OutputStream { Status DoAppend(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { RETURN_NOT_OK(CheckClosed("append")); - auto append_data = reinterpret_cast((void*)data); - auto block_content = Azure::Core::IO::MemoryBodyStream( - append_data, strlen(reinterpret_cast(append_data))); + auto append_data = reinterpret_cast(data); + auto block_content = Azure::Core::IO::MemoryBodyStream(append_data, nbytes); if (block_content.Length() == 0) { return Status::OK(); } - auto size = block_ids_.size(); + const auto n_block_ids = block_ids_.size(); - // New block ids must always be distinct from the existing block ids. Otherwise we + // New block ID must always be distinct from the existing block IDs. Otherwise we // will accidentally replace the content of existing blocks, causing corruption. // We will use monotonically increasing integers. - std::string new_block_id = std::to_string(size); + std::string new_block_id = std::to_string(n_block_ids); // Pad to 5 digits, because Azure allows a maximum of 50,000 blocks. const size_t target_number_of_digits = 5; - int required_padding_digits = + const auto required_padding_digits = target_number_of_digits - std::min(target_number_of_digits, new_block_id.size()); new_block_id.insert(0, required_padding_digits, '0'); new_block_id += "-arrow"; // Add a suffix to reduce risk of block_id collisions with @@ -669,7 +666,7 @@ class ObjectAppendStream final : public io::OutputStream { return CommitBlockList(block_blob_client_, block_ids_, metadata_); } - protected: + private: std::shared_ptr block_blob_client_; const io::IOContext io_context_; const AzureLocation location_; @@ -956,17 +953,17 @@ class AzureFileSystem::Impl { blob_service_client_->GetBlobContainerClient(location.container) .GetBlockBlobClient(location.path)); - std::shared_ptr ptr; + std::shared_ptr stream; if (truncate) { RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client)); - ptr = std::make_shared(block_blob_client, fs->io_context(), + stream = std::make_shared(block_blob_client, fs->io_context(), location, metadata, options_, 0); } else { - ptr = std::make_shared(block_blob_client, fs->io_context(), + stream = std::make_shared(block_blob_client, fs->io_context(), location, metadata, options_); } - RETURN_NOT_OK(ptr->Init()); - return ptr; + RETURN_NOT_OK(stream->Init()); + return stream; } }; From 77f8345f3bcafe29b4f99b95c2e2bb3b9233dee6 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 19 Nov 2023 12:40:50 +0000 Subject: [PATCH 19/26] PR comments2: move `DoAppend` to private --- cpp/src/arrow/filesystem/azurefs.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 39a37018321a9..a9613d4824205 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -619,6 +619,12 @@ class ObjectAppendStream final : public io::OutputStream { return DoAppend(data, nbytes); } + Status Flush() override { + RETURN_NOT_OK(CheckClosed("flush")); + return CommitBlockList(block_blob_client_, block_ids_, metadata_); + } + + private: Status DoAppend(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { RETURN_NOT_OK(CheckClosed("append")); @@ -661,12 +667,6 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } - Status Flush() override { - RETURN_NOT_OK(CheckClosed("flush")); - return CommitBlockList(block_blob_client_, block_ids_, metadata_); - } - - private: std::shared_ptr block_blob_client_; const io::IOContext io_context_; const AzureLocation location_; @@ -957,10 +957,10 @@ class AzureFileSystem::Impl { if (truncate) { RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client)); stream = std::make_shared(block_blob_client, fs->io_context(), - location, metadata, options_, 0); + location, metadata, options_, 0); } else { stream = std::make_shared(block_blob_client, fs->io_context(), - location, metadata, options_); + location, metadata, options_); } RETURN_NOT_OK(stream->Init()); return stream; From 0178660f867f23ad01075f394cf0d9d6eb552013 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 19 Nov 2023 13:02:34 +0000 Subject: [PATCH 20/26] Autoformat --- cpp/src/arrow/filesystem/azurefs_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 67a2dfd6e6352..aabc164de3b89 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -675,8 +675,7 @@ TEST_F(AzuriteFileSystemTest, TestWriteMetadata) { location, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}}))); ASSERT_OK(output->Write(expected.data(), expected.size())); ASSERT_OK(output->Close()); - blob_metadata = - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) + blob_metadata = blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) .GetBlockBlobClient(path) .GetProperties() .Value.Metadata; From 7b19febfe240507b2652a2def72a4b46a29d89a7 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Sun, 19 Nov 2023 23:27:41 +0000 Subject: [PATCH 21/26] Handle TODO(GH-38780) comments for using fs to write data in tests --- cpp/src/arrow/filesystem/azurefs_test.cc | 45 +++++++++++++----------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index aabc164de3b89..e1fd6d19065af 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -232,13 +232,11 @@ class AzureFileSystemTest : public ::testing::Test { void UploadLines(const std::vector& lines, const char* path_to_file, int total_size) { - // TODO(GH-38333): Switch to using Azure filesystem to write once its implemented. - auto blob_client = - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient(path_to_file); + const auto path = PreexistingContainerPath() + path_to_file; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); std::string all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); - blob_client.UploadFrom(reinterpret_cast(all_lines.data()), - total_size); + ASSERT_OK(output->Write(all_lines.data(), all_lines.size())); + ASSERT_OK(output->Close()); } void RunGetFileInfoObjectWithNestedStructureTest(); @@ -347,21 +345,26 @@ void AzureFileSystemTest::RunGetFileInfoObjectWithNestedStructureTest() { // Adds detailed tests to handle cases of different edge cases // with directory naming conventions (e.g. with and without slashes). constexpr auto kObjectName = "test-object-dir/some_other_dir/another_dir/foo"; - // TODO(GH-38333): Switch to using Azure filesystem to write once its implemented. - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient(kObjectName) - .UploadFrom(reinterpret_cast(kLoremIpsum), strlen(kLoremIpsum)); + ASSERT_OK_AND_ASSIGN( + auto output, + fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName, /*metadata=*/{})); + const auto data = std::string(kLoremIpsum); + ASSERT_OK(output->Write(data.data(), data.size())); + ASSERT_OK(output->Close()); // 0 is immediately after "/" lexicographically, ensure that this doesn't // cause unexpected issues. - // TODO(GH-38333): Switch to using Azure filesystem to write once its implemented. - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient("test-object-dir/some_other_dir0") - .UploadFrom(reinterpret_cast(kLoremIpsum), strlen(kLoremIpsum)); - - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient(std::string(kObjectName) + "0") - .UploadFrom(reinterpret_cast(kLoremIpsum), strlen(kLoremIpsum)); + ASSERT_OK_AND_ASSIGN(output, + fs_->OpenOutputStream( + PreexistingContainerPath() + "test-object-dir/some_other_dir0", + /*metadata=*/{})); + ASSERT_OK(output->Write(data.data(), data.size())); + ASSERT_OK(output->Close()); + ASSERT_OK_AND_ASSIGN( + output, fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName + "0", + /*metadata=*/{})); + ASSERT_OK(output->Write(data.data(), data.size())); + ASSERT_OK(output->Close()); AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName, FileType::File); AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName + "/", @@ -676,9 +679,9 @@ TEST_F(AzuriteFileSystemTest, TestWriteMetadata) { ASSERT_OK(output->Write(expected.data(), expected.size())); ASSERT_OK(output->Close()); blob_metadata = blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient(path) - .GetProperties() - .Value.Metadata; + .GetBlockBlobClient(path) + .GetProperties() + .Value.Metadata; EXPECT_EQ(blob_metadata["bar"], "foo"); // Defaults are overwritten and not merged. EXPECT_EQ(blob_metadata.find("foo"), blob_metadata.end()); From 74a6d55a246c67d97a5bcc1fb8f6d483cc864444 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Mon, 20 Nov 2023 09:31:39 +0000 Subject: [PATCH 22/26] PR comments: tests tidy up --- cpp/src/arrow/filesystem/azurefs_test.cc | 97 ++++++++++-------------- 1 file changed, 42 insertions(+), 55 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index e1fd6d19065af..7c577622c2a6d 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -234,8 +234,8 @@ class AzureFileSystemTest : public ::testing::Test { int total_size) { const auto path = PreexistingContainerPath() + path_to_file; ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); - std::string all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); - ASSERT_OK(output->Write(all_lines.data(), all_lines.size())); + const auto all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); + ASSERT_OK(output->Write(all_lines)); ASSERT_OK(output->Close()); } @@ -348,8 +348,8 @@ void AzureFileSystemTest::RunGetFileInfoObjectWithNestedStructureTest() { ASSERT_OK_AND_ASSIGN( auto output, fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName, /*metadata=*/{})); - const auto data = std::string(kLoremIpsum); - ASSERT_OK(output->Write(data.data(), data.size())); + const std::string_view data(kLoremIpsum); + ASSERT_OK(output->Write(data)); ASSERT_OK(output->Close()); // 0 is immediately after "/" lexicographically, ensure that this doesn't @@ -358,12 +358,12 @@ void AzureFileSystemTest::RunGetFileInfoObjectWithNestedStructureTest() { fs_->OpenOutputStream( PreexistingContainerPath() + "test-object-dir/some_other_dir0", /*metadata=*/{})); - ASSERT_OK(output->Write(data.data(), data.size())); + ASSERT_OK(output->Write(data)); ASSERT_OK(output->Close()); ASSERT_OK_AND_ASSIGN( output, fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName + "0", /*metadata=*/{})); - ASSERT_OK(output->Write(data.data(), data.size())); + ASSERT_OK(output->Write(data)); ASSERT_OK(output->Close()); AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName, FileType::File); @@ -653,15 +653,13 @@ TEST_F(AzuriteFileSystemTest, OpenInputStreamClosed) { TEST_F(AzuriteFileSystemTest, TestWriteMetadata) { options_.default_metadata = arrow::key_value_metadata({{"foo", "bar"}}); - std::shared_ptr fs_with_defaults; - ASSERT_OK_AND_ASSIGN(fs_with_defaults, AzureFileSystem::Make(options_)); + ASSERT_OK_AND_ASSIGN(auto fs_with_defaults, AzureFileSystem::Make(options_)); std::string path = "object_with_defaults"; auto location = PreexistingContainerPath() + path; - std::shared_ptr output; - ASSERT_OK_AND_ASSIGN(output, + ASSERT_OK_AND_ASSIGN(auto output, fs_with_defaults->OpenOutputStream(location, /*metadata=*/{})); - const auto expected = std::string(kLoremIpsum); - ASSERT_OK(output->Write(expected.data(), expected.size())); + const std::string_view expected(kLoremIpsum); + ASSERT_OK(output->Write(expected)); ASSERT_OK(output->Close()); // Verify the metadata has been set. @@ -670,46 +668,42 @@ TEST_F(AzuriteFileSystemTest, TestWriteMetadata) { .GetBlockBlobClient(path) .GetProperties() .Value.Metadata; - EXPECT_EQ(blob_metadata["foo"], "bar"); + EXPECT_EQ(Azure::Core::CaseInsensitiveMap{std::make_pair("foo", "bar")}, blob_metadata); // Check that explicit metadata overrides the defaults. ASSERT_OK_AND_ASSIGN( output, fs_with_defaults->OpenOutputStream( location, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}}))); - ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Write(expected)); ASSERT_OK(output->Close()); blob_metadata = blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) .GetBlockBlobClient(path) .GetProperties() .Value.Metadata; - EXPECT_EQ(blob_metadata["bar"], "foo"); // Defaults are overwritten and not merged. - EXPECT_EQ(blob_metadata.find("foo"), blob_metadata.end()); + EXPECT_EQ(Azure::Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata); } TEST_F(AzuriteFileSystemTest, OpenOutputStreamSmall) { const auto path = PreexistingContainerPath() + "test-write-object"; std::shared_ptr output; ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); - const auto expected = std::string(kLoremIpsum); - ASSERT_OK(output->Write(expected.data(), expected.size())); + const std::string_view expected(kLoremIpsum); + ASSERT_OK(output->Write(expected)); ASSERT_OK(output->Close()); // Verify we can read the object back. - std::shared_ptr input; - ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); std::array inbuf{}; - std::int64_t size; - ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); - EXPECT_EQ(std::string(inbuf.data(), size), expected); + EXPECT_EQ(expected, std::string_view(inbuf.data(), size)); } TEST_F(AzuriteFileSystemTest, OpenOutputStreamLarge) { const auto path = PreexistingContainerPath() + "test-write-object"; - std::shared_ptr output; - ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; std::array buffers{ std::string(sizes[0], 'A'), @@ -718,15 +712,14 @@ TEST_F(AzuriteFileSystemTest, OpenOutputStreamLarge) { }; auto expected = std::int64_t{0}; for (auto i = 0; i != 3; ++i) { - ASSERT_OK(output->Write(buffers[i].data(), buffers[i].size())); + ASSERT_OK(output->Write(buffers[i])); expected += sizes[i]; - ASSERT_EQ(output->Tell(), expected); + ASSERT_EQ(expected, output->Tell()); } ASSERT_OK(output->Close()); // Verify we can read the object back. - std::shared_ptr input; - ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); std::string contents; std::shared_ptr buffer; @@ -741,68 +734,62 @@ TEST_F(AzuriteFileSystemTest, OpenOutputStreamLarge) { TEST_F(AzuriteFileSystemTest, OpenOutputStreamTruncatesExistingFile) { const auto path = PreexistingContainerPath() + "test-write-object"; - std::shared_ptr output; - ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); - const std::string expected0 = "Existing blob content"; - ASSERT_OK(output->Write(expected0.data(), expected0.size())); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + const std::string_view expected0("Existing blob content"); + ASSERT_OK(output->Write(expected0)); ASSERT_OK(output->Close()); // Check that the initial content has been written - if not this test is not achieving // what it's meant to. - std::shared_ptr input; - ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); std::array inbuf{}; - std::int64_t size; - ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); - EXPECT_EQ(std::string(inbuf.data(), size), expected0); + ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(expected0, std::string_view(inbuf.data(), size)); ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); - const auto expected1 = std::string(kLoremIpsum); - ASSERT_OK(output->Write(expected1.data(), expected1.size())); + const std::string_view expected1(kLoremIpsum); + ASSERT_OK(output->Write(expected1)); ASSERT_OK(output->Close()); // Verify that the initial content has been overwritten. ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); - EXPECT_EQ(std::string(inbuf.data(), size), expected1); + EXPECT_EQ(expected1, std::string_view(inbuf.data(), size)); } TEST_F(AzuriteFileSystemTest, OpenAppendStreamDoesNotTruncateExistingFile) { const auto path = PreexistingContainerPath() + "test-write-object"; - std::shared_ptr output; - ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); - const std::string expected0 = "Existing blob content"; - ASSERT_OK(output->Write(expected0.data(), expected0.size())); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + const std::string_view expected0("Existing blob content"); + ASSERT_OK(output->Write(expected0)); ASSERT_OK(output->Close()); // Check that the initial content has been written - if not this test is not achieving // what it's meant to. - std::shared_ptr input; - ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); std::array inbuf{}; - std::int64_t size; - ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); - EXPECT_EQ(std::string(inbuf.data(), size), expected0); + ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(expected0, std::string_view(inbuf.data())); ASSERT_OK_AND_ASSIGN(output, fs_->OpenAppendStream(path, {})); - const auto expected1 = std::string(kLoremIpsum); - ASSERT_OK(output->Write(expected1.data(), expected1.size())); + const std::string_view expected1(kLoremIpsum); + ASSERT_OK(output->Write(expected1)); ASSERT_OK(output->Close()); // Verify that the initial content has not been overwritten and that the block from // the other client was not committed. ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); - EXPECT_EQ(std::string(inbuf.data(), size), expected0 + expected1); + EXPECT_EQ(std::string(inbuf.data(), size), + std::string(expected0) + std::string(expected1)); } TEST_F(AzuriteFileSystemTest, OpenOutputStreamClosed) { const auto path = internal::ConcatAbstractPath(PreexistingContainerName(), "open-output-stream-closed.txt"); - std::shared_ptr output; - ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); ASSERT_OK(output->Close()); ASSERT_RAISES(Invalid, output->Write(kLoremIpsum, std::strlen(kLoremIpsum))); ASSERT_RAISES(Invalid, output->Flush()); From 9a0d9caecfdc858c2e545c76bd20e4c5c41aa2d8 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Mon, 20 Nov 2023 10:00:13 +0000 Subject: [PATCH 23/26] Add a comment about the risk of overlapping block ids --- cpp/src/arrow/filesystem/azurefs.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index a9613d4824205..bc341b9cb1d1e 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -646,8 +646,12 @@ class ObjectAppendStream final : public io::OutputStream { const auto required_padding_digits = target_number_of_digits - std::min(target_number_of_digits, new_block_id.size()); new_block_id.insert(0, required_padding_digits, '0'); - new_block_id += "-arrow"; // Add a suffix to reduce risk of block_id collisions with - // blocks created by other applications. + // There is a small risk when appending to a blob created by another client that + // `new_block_id` may overlapping with an existing block id. Adding the `-arrow` + // suffix significantly reduces the risk, but does not 100% eliminate it. For example + // if the blob was previously created with one block, with id `00001-arrow` then the + // next block we append will conflict with that, and cause corruption. + new_block_id += "-arrow"; new_block_id = Azure::Core::Convert::Base64Encode( std::vector(new_block_id.begin(), new_block_id.end())); From 0e84a82ccd0ee09dfa5ed8c582999180a4ea8bce Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Mon, 20 Nov 2023 10:00:57 +0000 Subject: [PATCH 24/26] Lint --- cpp/src/arrow/filesystem/azurefs_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 7c577622c2a6d..7cbb8c77a76c8 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -696,7 +696,7 @@ TEST_F(AzuriteFileSystemTest, OpenOutputStreamSmall) { ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); std::array inbuf{}; - ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); + ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); EXPECT_EQ(expected, std::string_view(inbuf.data(), size)); } From ad0ed8fa4d9b705d4d2f0cc05b76d783a10f4231 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Mon, 20 Nov 2023 18:15:07 +0000 Subject: [PATCH 25/26] Lint --- cpp/src/arrow/filesystem/azurefs.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index bc341b9cb1d1e..2e4dd6684d7b3 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -646,10 +646,10 @@ class ObjectAppendStream final : public io::OutputStream { const auto required_padding_digits = target_number_of_digits - std::min(target_number_of_digits, new_block_id.size()); new_block_id.insert(0, required_padding_digits, '0'); - // There is a small risk when appending to a blob created by another client that - // `new_block_id` may overlapping with an existing block id. Adding the `-arrow` - // suffix significantly reduces the risk, but does not 100% eliminate it. For example - // if the blob was previously created with one block, with id `00001-arrow` then the + // There is a small risk when appending to a blob created by another client that + // `new_block_id` may overlapping with an existing block id. Adding the `-arrow` + // suffix significantly reduces the risk, but does not 100% eliminate it. For example + // if the blob was previously created with one block, with id `00001-arrow` then the // next block we append will conflict with that, and cause corruption. new_block_id += "-arrow"; new_block_id = Azure::Core::Convert::Base64Encode( From 7412b902e4e9861d20b34604b83b0bf9c4770710 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Tue, 21 Nov 2023 11:57:01 +0900 Subject: [PATCH 26/26] Simplify --- cpp/src/arrow/filesystem/azurefs.cc | 4 ++-- cpp/src/arrow/filesystem/azurefs_test.cc | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 2e4dd6684d7b3..2c3d81ca24c51 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -629,7 +629,7 @@ class ObjectAppendStream final : public io::OutputStream { std::shared_ptr owned_buffer = nullptr) { RETURN_NOT_OK(CheckClosed("append")); auto append_data = reinterpret_cast(data); - auto block_content = Azure::Core::IO::MemoryBodyStream(append_data, nbytes); + Azure::Core::IO::MemoryBodyStream block_content(append_data, nbytes); if (block_content.Length() == 0) { return Status::OK(); } @@ -639,7 +639,7 @@ class ObjectAppendStream final : public io::OutputStream { // New block ID must always be distinct from the existing block IDs. Otherwise we // will accidentally replace the content of existing blocks, causing corruption. // We will use monotonically increasing integers. - std::string new_block_id = std::to_string(n_block_ids); + auto new_block_id = std::to_string(n_block_ids); // Pad to 5 digits, because Azure allows a maximum of 50,000 blocks. const size_t target_number_of_digits = 5; diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 7cbb8c77a76c8..e9b9a6f34b88c 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -686,8 +686,7 @@ TEST_F(AzuriteFileSystemTest, TestWriteMetadata) { TEST_F(AzuriteFileSystemTest, OpenOutputStreamSmall) { const auto path = PreexistingContainerPath() + "test-write-object"; - std::shared_ptr output; - ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); const std::string_view expected(kLoremIpsum); ASSERT_OK(output->Write(expected)); ASSERT_OK(output->Close());