Skip to content

Commit

Permalink
Merge branch 'main' into minor-enhance-create-page-writer
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Mar 26, 2024
2 parents f970264 + e3b0bd1 commit 9265b08
Show file tree
Hide file tree
Showing 39 changed files with 798 additions and 150 deletions.
4 changes: 3 additions & 1 deletion cpp/cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -1812,7 +1812,9 @@ if(ARROW_WITH_PROTOBUF)
else()
set(ARROW_PROTOBUF_REQUIRED_VERSION "2.6.1")
endif()
if(ARROW_ORC OR ARROW_WITH_OPENTELEMETRY)
if(ARROW_ORC
OR ARROW_SUBSTRAIT
OR ARROW_WITH_OPENTELEMETRY)
set(ARROW_PROTOBUF_ARROW_CMAKE_PACKAGE_NAME "Arrow")
set(ARROW_PROTOBUF_ARROW_PC_PACKAGE_NAME "arrow")
elseif(ARROW_FLIGHT)
Expand Down
174 changes: 161 additions & 13 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,21 @@
#define ARROW_S3_HAS_CRT
#endif

#if ARROW_AWS_SDK_VERSION_CHECK(1, 10, 0)
#define ARROW_S3_HAS_S3CLIENT_CONFIGURATION
#endif

#ifdef ARROW_S3_HAS_CRT
#include <aws/crt/io/Bootstrap.h>
#include <aws/crt/io/EventLoopGroup.h>
#include <aws/crt/io/HostResolver.h>
#endif

#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
#include <aws/s3/S3ClientConfiguration.h>
#include <aws/s3/S3EndpointProvider.h>
#endif

#include "arrow/util/windows_fixup.h"

#include "arrow/buffer.h"
Expand All @@ -128,19 +137,17 @@
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"

namespace arrow {

using internal::TaskGroup;
using internal::ToChars;
using io::internal::SubmitIO;
using util::Uri;

namespace fs {
namespace arrow::fs {

using ::Aws::Client::AWSError;
using ::Aws::S3::S3Errors;
namespace S3Model = Aws::S3::Model;

using ::arrow::internal::TaskGroup;
using ::arrow::internal::ToChars;
using ::arrow::io::internal::SubmitIO;
using ::arrow::util::Uri;

using internal::ConnectRetryStrategy;
using internal::DetectS3Backend;
using internal::ErrorToStatus;
Expand Down Expand Up @@ -913,6 +920,134 @@ Result<std::shared_ptr<S3ClientHolder>> GetClientHolder(
// -----------------------------------------------------------------------
// S3 client factory: build S3Client from S3Options

#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION

// GH-40279: standard initialization of S3Client creates a new `S3EndpointProvider`
// every time. Its construction takes 1ms, which makes instantiating every S3Client
// very costly (see upstream bug report
// at https://github.com/aws/aws-sdk-cpp/issues/2880).
// To work around this, we build and cache `S3EndpointProvider` instances
// for each distinct endpoint configuration, and reuse them whenever possible.
// Since most applications tend to use a single endpoint configuration, this
// makes the 1ms setup cost a once-per-process overhead, making it much more
// bearable - if not ideal.

struct EndpointConfigKey {
explicit EndpointConfigKey(const Aws::S3::S3ClientConfiguration& config)
: region(config.region),
scheme(config.scheme),
endpoint_override(config.endpointOverride),
use_virtual_addressing(config.useVirtualAddressing) {}

Aws::String region;
Aws::Http::Scheme scheme;
Aws::String endpoint_override;
bool use_virtual_addressing;

bool operator==(const EndpointConfigKey& other) const noexcept {
return region == other.region && scheme == other.scheme &&
endpoint_override == other.endpoint_override &&
use_virtual_addressing == other.use_virtual_addressing;
}
};

} // namespace
} // namespace arrow::fs

template <>
struct std::hash<arrow::fs::EndpointConfigKey> {
std::size_t operator()(const arrow::fs::EndpointConfigKey& key) const noexcept {
// A crude hash is sufficient since we expect the cache to remain very small.
auto h = std::hash<Aws::String>{};
return h(key.region) ^ h(key.endpoint_override);
}
};

namespace arrow::fs {
namespace {

// EndpointProvider configuration happens in a non-thread-safe way, even
// when the updates are idempotent. This is a problem when trying to reuse
// a single EndpointProvider from several clients.
// To work around this, this class ensures reconfiguration of an existing
// EndpointProvider is a no-op.
class InitOnceEndpointProvider : public Aws::S3::S3EndpointProviderBase {
public:
explicit InitOnceEndpointProvider(
std::shared_ptr<Aws::S3::S3EndpointProviderBase> wrapped)
: wrapped_(std::move(wrapped)) {}

void InitBuiltInParameters(const Aws::S3::S3ClientConfiguration& config) override {}

void OverrideEndpoint(const Aws::String& endpoint) override {
ARROW_LOG(ERROR) << "unexpected call to InitOnceEndpointProvider::OverrideEndpoint";
}
Aws::S3::Endpoint::S3ClientContextParameters& AccessClientContextParameters() override {
ARROW_LOG(ERROR)
<< "unexpected call to InitOnceEndpointProvider::AccessClientContextParameters";
// Need to return a reference to something...
return wrapped_->AccessClientContextParameters();
}

const Aws::S3::Endpoint::S3ClientContextParameters& GetClientContextParameters()
const override {
return wrapped_->GetClientContextParameters();
}
Aws::Endpoint::ResolveEndpointOutcome ResolveEndpoint(
const Aws::Endpoint::EndpointParameters& params) const override {
return wrapped_->ResolveEndpoint(params);
}

protected:
std::shared_ptr<Aws::S3::S3EndpointProviderBase> wrapped_;
};

// A class that instantiates a single EndpointProvider per distinct endpoint
// configuration and initializes it in a thread-safe way. See earlier comments
// for rationale.
class EndpointProviderCache {
public:
std::shared_ptr<Aws::S3::S3EndpointProviderBase> Lookup(
const Aws::S3::S3ClientConfiguration& config) {
auto key = EndpointConfigKey(config);
CacheValue* value;
{
std::unique_lock lock(mutex_);
value = &cache_[std::move(key)];
}
std::call_once(value->once, [&]() {
auto endpoint_provider = std::make_shared<Aws::S3::S3EndpointProvider>();
endpoint_provider->InitBuiltInParameters(config);
value->endpoint_provider =
std::make_shared<InitOnceEndpointProvider>(std::move(endpoint_provider));
});
return value->endpoint_provider;
}

void Reset() {
std::unique_lock lock(mutex_);
cache_.clear();
}

static EndpointProviderCache* Instance() {
static EndpointProviderCache instance;
return &instance;
}

private:
EndpointProviderCache() = default;

struct CacheValue {
std::once_flag once;
std::shared_ptr<Aws::S3::S3EndpointProviderBase> endpoint_provider;
};

std::mutex mutex_;
std::unordered_map<EndpointConfigKey, CacheValue> cache_;
};

#endif // ARROW_S3_HAS_S3CLIENT_CONFIGURATION

class ClientBuilder {
public:
explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
Expand Down Expand Up @@ -958,9 +1093,6 @@ class ClientBuilder {
client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
}

const bool use_virtual_addressing =
options_.endpoint_override.empty() || options_.force_virtual_addressing;

// Set proxy options if provided
if (!options_.proxy_options.scheme.empty()) {
if (options_.proxy_options.scheme == "http") {
Expand Down Expand Up @@ -990,10 +1122,20 @@ class ClientBuilder {
client_config_.maxConnections = std::max(io_context->executor()->GetCapacity(), 25);
}

const bool use_virtual_addressing =
options_.endpoint_override.empty() || options_.force_virtual_addressing;

#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
client_config_.useVirtualAddressing = use_virtual_addressing;
auto endpoint_provider = EndpointProviderCache::Instance()->Lookup(client_config_);
auto client = std::make_shared<S3Client>(credentials_provider_, endpoint_provider,
client_config_);
#else
auto client = std::make_shared<S3Client>(
credentials_provider_, client_config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
use_virtual_addressing);
#endif
client->s3_retry_strategy_ = options_.retry_strategy;
return GetClientHolder(std::move(client));
}
Expand All @@ -1002,7 +1144,11 @@ class ClientBuilder {

protected:
S3Options options_;
#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
Aws::S3::S3ClientConfiguration client_config_;
#else
Aws::Client::ClientConfiguration client_config_;
#endif
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
};

Expand Down Expand Up @@ -2949,6 +3095,9 @@ struct AwsInstance {
"This could lead to a segmentation fault at exit";
}
GetClientFinalizer()->Finalize();
#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION
EndpointProviderCache::Instance()->Reset();
#endif
Aws::ShutdownAPI(aws_options_);
}
}
Expand Down Expand Up @@ -3090,5 +3239,4 @@ Result<std::string> ResolveS3BucketRegion(const std::string& bucket) {
return resolver->ResolveRegion(bucket);
}

} // namespace fs
} // namespace arrow
} // namespace arrow::fs
12 changes: 6 additions & 6 deletions cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class BaseMemoryPoolImpl : public MemoryPool {
}
#endif

stats_.UpdateAllocatedBytes(size);
stats_.DidAllocateBytes(size);
return Status::OK();
}

Expand All @@ -494,7 +494,7 @@ class BaseMemoryPoolImpl : public MemoryPool {
}
#endif

stats_.UpdateAllocatedBytes(new_size - old_size);
stats_.DidReallocateBytes(old_size, new_size);
return Status::OK();
}

Expand All @@ -509,7 +509,7 @@ class BaseMemoryPoolImpl : public MemoryPool {
#endif
Allocator::DeallocateAligned(buffer, size, alignment);

stats_.UpdateAllocatedBytes(-size, /*is_free*/ true);
stats_.DidFreeBytes(size);
}

void ReleaseUnused() override { Allocator::ReleaseUnused(); }
Expand Down Expand Up @@ -761,20 +761,20 @@ class ProxyMemoryPool::ProxyMemoryPoolImpl {

Status Allocate(int64_t size, int64_t alignment, uint8_t** out) {
RETURN_NOT_OK(pool_->Allocate(size, alignment, out));
stats_.UpdateAllocatedBytes(size);
stats_.DidAllocateBytes(size);
return Status::OK();
}

Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) {
RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, alignment, ptr));
stats_.UpdateAllocatedBytes(new_size - old_size);
stats_.DidReallocateBytes(old_size, new_size);
return Status::OK();
}

void Free(uint8_t* buffer, int64_t size, int64_t alignment) {
pool_->Free(buffer, size, alignment);
stats_.UpdateAllocatedBytes(-size, /*is_free=*/true);
stats_.DidFreeBytes(size);
}

int64_t bytes_allocated() const { return stats_.bytes_allocated(); }
Expand Down
Loading

0 comments on commit 9265b08

Please sign in to comment.