From 7c2efec10cc561f8c1be3e4df576df5b01a7785b Mon Sep 17 00:00:00 2001 From: "Ukjae Jeong (Jay)" Date: Fri, 22 Sep 2023 15:49:45 +0900 Subject: [PATCH] make executor pool size configurable (#1857) --- .../core/filesystems/s3/s3_filesystem.cc | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tensorflow_io/core/filesystems/s3/s3_filesystem.cc b/tensorflow_io/core/filesystems/s3/s3_filesystem.cc index 3217edb85..654269536 100644 --- a/tensorflow_io/core/filesystems/s3/s3_filesystem.cc +++ b/tensorflow_io/core/filesystems/s3/s3_filesystem.cc @@ -203,14 +203,26 @@ static void GetS3Client(tf_s3_filesystem::S3File* s3_file) { } } -static void GetExecutor(tf_s3_filesystem::S3File* s3_file) { +// GetExecutor initializes the executor in s3_file if it is not initialized. +// +// This function returns executor_pool_size that is used in initialization, +// but the caller can ignore the return value. +static int GetExecutor(tf_s3_filesystem::S3File* s3_file) { absl::MutexLock l(&s3_file->initialization_lock); + int temp_value; + const char* executor_pool_size = getenv("S3_EXECUTOR_POOL_SIZE"); + if (executor_pool_size == nullptr || + !absl::SimpleAtoi(executor_pool_size, &temp_value)) + temp_value = kExecutorPoolSize; + if (s3_file->executor.get() == nullptr) { s3_file->executor = Aws::MakeShared( - kExecutorTag, kExecutorPoolSize); + kExecutorTag, temp_value); } + + return temp_value; } static void GetTransferManager( @@ -218,7 +230,7 @@ static void GetTransferManager( tf_s3_filesystem::S3File* s3_file) { // These functions should be called before holding `initialization_lock`. GetS3Client(s3_file); - GetExecutor(s3_file); + int executor_pool_size = GetExecutor(s3_file); absl::MutexLock l(&s3_file->initialization_lock); @@ -242,7 +254,7 @@ static void GetTransferManager( config.s3Client = s3_file->s3_client; config.bufferSize = temp_value; // must be larger than pool size * multi part chunk size - config.transferBufferMaxHeapSize = (kExecutorPoolSize + 1) * temp_value; + config.transferBufferMaxHeapSize = (executor_pool_size + 1) * temp_value; s3_file->transfer_managers.emplace( direction, Aws::Transfer::TransferManager::Create(config)); }