Skip to content

Commit

Permalink
Add S3 metrics collection and reporting.
Browse files Browse the repository at this point in the history
  • Loading branch information
athmaja-n committed Aug 28, 2024
1 parent 2518463 commit d72c7fe
Show file tree
Hide file tree
Showing 11 changed files with 576 additions and 32 deletions.
29 changes: 18 additions & 11 deletions velox/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(velox_exception Exceptions.cpp VeloxException.cpp Exceptions.h)
target_link_libraries(
velox_exception PUBLIC velox_flag_definitions velox_process Folly::folly
fmt::fmt gflags::gflags glog::glog)
velox_add_library(velox_exception Exceptions.cpp VeloxException.cpp
Exceptions.h)
velox_link_libraries(
velox_exception
PUBLIC velox_flag_definitions
velox_process
Folly::folly
fmt::fmt
gflags::gflags
glog::glog)

add_library(
velox_add_library(
velox_common_base
BitUtil.cpp
Counters.cpp
Expand All @@ -32,10 +38,11 @@ add_library(
StatsReporter.cpp
SuccinctPrinter.cpp)

target_link_libraries(
# Link the newly created velox_s3_metrics_aggregator library
velox_link_libraries(
velox_common_base
PUBLIC velox_exception Folly::folly fmt::fmt xsimd
PRIVATE velox_common_compression velox_process velox_test_util glog::glog)
PRIVATE velox_common_compression velox_process velox_test_util glog::glog velox_s3_metrics_aggregator)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand All @@ -45,8 +52,8 @@ if(${VELOX_ENABLE_BENCHMARKS})
add_subdirectory(benchmarks)
endif()

add_library(velox_id_map BigintIdMap.cpp)
target_link_libraries(
velox_add_library(velox_id_map BigintIdMap.cpp)
velox_link_libraries(
velox_id_map
velox_memory
velox_flag_definitions
Expand All @@ -56,8 +63,8 @@ target_link_libraries(
fmt::fmt
gflags::gflags)

add_library(velox_status Status.cpp)
target_link_libraries(
velox_add_library(velox_status Status.cpp)
velox_link_libraries(
velox_status
PUBLIC fmt::fmt Folly::folly
PRIVATE glog::glog)
99 changes: 98 additions & 1 deletion velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ PeriodicStatsReporter::PeriodicStatsReporter(const Options& options)
cache_(options.cache),
arbitrator_(options.arbitrator),
spillMemoryPool_(options.spillMemoryPool),
options_(options) {}
options_(options) {
#ifdef VELOX_ENABLE_S3
if (options.s3Metrics) {
addS3MetricsAggregatorTask(options.s3Metrics, options.s3MetricsIntervalMs);
}
#endif
}

void PeriodicStatsReporter::start() {
LOG(INFO) << "Starting PeriodicStatsReporter with options "
Expand Down Expand Up @@ -256,4 +262,95 @@ void PeriodicStatsReporter::reportSpillStats() {
RECORD_METRIC_VALUE(kMetricSpillPeakMemoryBytes, spillMemoryStats.peakBytes);
}

void PeriodicStatsReporter::addS3MetricsAggregatorTask(
const std::shared_ptr<velox::filesystems::S3MetricsAggregator>& s3Metrics,
uint64_t intervalMs) {
addTask(
"report_s3_metrics",
[s3Metrics]() {
auto aggregator =
velox::filesystems::S3MetricsAggregator::getInstance();

LOG(INFO) << "Updating S3 metrics: "
<< "ActiveConnections="
<< aggregator->getMetric(
velox::filesystems::kMetricS3ActiveConnections)
<< ", "
<< "StartedUploads="
<< aggregator->getMetric(
velox::filesystems::kMetricS3StartedUploads)
<< ", "
<< "FailedUploads="
<< aggregator->getMetric(
velox::filesystems::kMetricS3FailedUploads)
<< ", "
<< "SuccessfulUploads="
<< aggregator->getMetric(
velox::filesystems::kMetricS3SuccessfulUploads);

// Record the metrics
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3ActiveConnections,
aggregator->getMetric(
velox::filesystems::kMetricS3ActiveConnections));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3StartedUploads,
aggregator->getMetric(velox::filesystems::kMetricS3StartedUploads));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3FailedUploads,
aggregator->getMetric(velox::filesystems::kMetricS3FailedUploads));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3SuccessfulUploads,
aggregator->getMetric(
velox::filesystems::kMetricS3SuccessfulUploads));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3MetadataCalls,
aggregator->getMetric(velox::filesystems::kMetricS3MetadataCalls));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3ListStatusCalls,
aggregator->getMetric(
velox::filesystems::kMetricS3ListStatusCalls));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3ListLocatedStatusCalls,
aggregator->getMetric(
velox::filesystems::kMetricS3ListLocatedStatusCalls));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3ListObjectsCalls,
aggregator->getMetric(
velox::filesystems::kMetricS3ListObjectsCalls));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3OtherReadErrors,
aggregator->getMetric(
velox::filesystems::kMetricS3OtherReadErrors));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3AwsAbortedExceptions,
aggregator->getMetric(
velox::filesystems::kMetricS3AwsAbortedExceptions));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3SocketExceptions,
aggregator->getMetric(
velox::filesystems::kMetricS3SocketExceptions));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3GetObjectErrors,
aggregator->getMetric(
velox::filesystems::kMetricS3GetObjectErrors));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3GetMetadataErrors,
aggregator->getMetric(
velox::filesystems::kMetricS3GetMetadataErrors));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3GetObjectRetries,
aggregator->getMetric(
velox::filesystems::kMetricS3GetObjectRetries));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3GetMetadataRetries,
aggregator->getMetric(
velox::filesystems::kMetricS3GetMetadataRetries));
RECORD_METRIC_VALUE(
velox::filesystems::kMetricS3ReadRetries,
aggregator->getMetric(velox::filesystems::kMetricS3ReadRetries));
},
intervalMs);
}

} // namespace facebook::velox
23 changes: 21 additions & 2 deletions velox/common/base/PeriodicStatsReporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
#include "velox/common/caching/SsdFile.h"
#include "velox/common/memory/MemoryArbitrator.h"

#ifdef VELOX_ENABLE_S3
#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"
#endif

namespace folly {
class CPUThreadPoolExecutor;
}
Expand Down Expand Up @@ -53,14 +58,25 @@ class PeriodicStatsReporter {
const memory::MemoryPool* spillMemoryPool{nullptr};
uint64_t spillStatsIntervalMs{60'000};

#ifdef VELOX_ENABLE_S3
std::shared_ptr<velox::filesystems::S3MetricsAggregator> s3Metrics{nullptr};
uint64_t s3MetricsIntervalMs{60'000};
#endif

std::string toString() const {
return fmt::format(
std::string result = fmt::format(
"allocatorStatsIntervalMs:{}, cacheStatsIntervalMs:{}, "
"arbitratorStatsIntervalMs:{}, spillStatsIntervalMs:{}",
allocatorStatsIntervalMs,
cacheStatsIntervalMs,
arbitratorStatsIntervalMs,
spillStatsIntervalMs);

#ifdef VELOX_ENABLE_S3
result += fmt::format(", s3MetricsIntervalMs:{}", s3MetricsIntervalMs);
#endif

return result;
}
};

Expand Down Expand Up @@ -95,7 +111,10 @@ class PeriodicStatsReporter {
void reportAllocatorStats();
void reportArbitratorStats();
void reportSpillStats();

// Method for adding the S3 metrics aggregator task.
void addS3MetricsAggregatorTask(
const std::shared_ptr<velox::filesystems::S3MetricsAggregator>& s3Metrics,
uint64_t intervalMs);
const velox::memory::MemoryAllocator* const allocator_{nullptr};
const velox::cache::AsyncDataCache* const cache_{nullptr};
const velox::memory::MemoryArbitrator* const arbitrator_{nullptr};
Expand Down
54 changes: 54 additions & 0 deletions velox/common/base/tests/StatsReporterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
#include "velox/common/caching/SsdCache.h"
#include "velox/common/memory/MmapAllocator.h"

#ifdef VELOX_ENABLE_S3
#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"
#endif

namespace facebook::velox {

class TestReporter : public BaseStatsReporter {
Expand Down Expand Up @@ -612,6 +617,55 @@ TEST_F(PeriodicStatsReporterTest, allNullOption) {
ASSERT_NO_THROW(stopPeriodicStatsReporter());
}

TEST_F(PeriodicStatsReporterTest, s3MetricsReporting) {
#ifdef VELOX_ENABLE_S3
auto s3Metrics = filesystems::S3MetricsAggregator::getInstance();
ASSERT_NE(s3Metrics, nullptr) << "S3MetricsAggregator instance is null!";

// Reset all metrics before starting the test
s3Metrics->resetMetric(filesystems::kMetricS3ActiveConnections);
s3Metrics->resetMetric(filesystems::kMetricS3StartedUploads);
s3Metrics->resetMetric(filesystems::kMetricS3FailedUploads);
s3Metrics->resetMetric(filesystems::kMetricS3SuccessfulUploads);

PeriodicStatsReporter::Options options;
options.s3Metrics = s3Metrics;
options.s3MetricsIntervalMs = 1000;

PeriodicStatsReporter periodicReporter(options);
periodicReporter.start();

std::this_thread::sleep_for(std::chrono::milliseconds(1100));

// Increment S3 metrics by calling incrementMetric multiple times
s3Metrics->incrementMetric(filesystems::kMetricS3ActiveConnections);
s3Metrics->incrementMetric(filesystems::kMetricS3StartedUploads);
// Increment again for 2
s3Metrics->incrementMetric(filesystems::kMetricS3StartedUploads);
s3Metrics->incrementMetric(filesystems::kMetricS3FailedUploads);
s3Metrics->incrementMetric(filesystems::kMetricS3SuccessfulUploads);
// Increment again for 2
s3Metrics->incrementMetric(filesystems::kMetricS3SuccessfulUploads);
// Increment again for 3
s3Metrics->incrementMetric(filesystems::kMetricS3SuccessfulUploads);

std::this_thread::sleep_for(std::chrono::milliseconds(1500));

periodicReporter.stop();

const auto& counterMap = reporter_->counterMap;

ASSERT_EQ(counterMap.count(filesystems::kMetricS3ActiveConnections), 1);
ASSERT_EQ(counterMap.count(filesystems::kMetricS3StartedUploads), 1);
ASSERT_EQ(counterMap.count(filesystems::kMetricS3FailedUploads), 1);
ASSERT_EQ(counterMap.count(filesystems::kMetricS3SuccessfulUploads), 1);
ASSERT_EQ(counterMap.at(filesystems::kMetricS3ActiveConnections), 1);
ASSERT_EQ(counterMap.at(filesystems::kMetricS3StartedUploads), 2);
ASSERT_EQ(counterMap.at(filesystems::kMetricS3FailedUploads), 1);
ASSERT_EQ(counterMap.at(filesystems::kMetricS3SuccessfulUploads), 3);
#endif
}

// Registering to folly Singleton with intended reporter type
folly::Singleton<BaseStatsReporter> reporter([]() {
return new TestReporter();
Expand Down
25 changes: 19 additions & 6 deletions velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# for generated headers
# For generated headers

# Create a new library for S3MetricsAggregator to avoid cyclic dependency
velox_add_library(velox_s3_metrics_aggregator STATIC
S3MetricsAggregator.cpp
)

target_link_libraries(velox_s3_metrics_aggregator
PUBLIC Folly::folly
PRIVATE glog::glog
)

# Create the velox_s3fs library
velox_add_library(velox_s3fs RegisterS3FileSystem.cpp)

add_library(velox_s3fs RegisterS3FileSystem.cpp)
if(VELOX_ENABLE_S3)
target_sources(velox_s3fs PRIVATE S3FileSystem.cpp S3Util.cpp)
velox_sources(velox_s3fs PRIVATE S3FileSystem.cpp S3Util.cpp)

target_include_directories(velox_s3fs PUBLIC ${AWSSDK_INCLUDE_DIRS})
target_link_libraries(velox_s3fs velox_dwio_common Folly::folly
${AWSSDK_LIBRARIES})
# Link necessary AWS SDK libraries and the S3 metrics aggregator
velox_include_directories(velox_s3fs PUBLIC ${AWSSDK_INCLUDE_DIRS})
velox_link_libraries(velox_s3fs velox_dwio_common Folly::folly
${AWSSDK_LIBRARIES} velox_s3_metrics_aggregator)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
28 changes: 28 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
#include "velox/core/QueryConfig.h"
#include "velox/dwio/common/DataBuffer.h"

// Include the S3 metrics headers
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"

#include <fmt/format.h>
#include <glog/logging.h>
#include <memory>
Expand Down Expand Up @@ -102,6 +106,10 @@ class S3ReadFile final : public ReadFile {
outcome, "Failed to get metadata for S3 object", bucket_, key_);
length_ = outcome.GetResult().GetContentLength();
VELOX_CHECK_GE(length_, 0);

// Increment the metadata call metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3MetadataCalls);
}

std::string_view pread(uint64_t offset, uint64_t length, void* buffer)
Expand Down Expand Up @@ -182,6 +190,10 @@ class S3ReadFile final : public ReadFile {
AwsWriteableStreamFactory(position, length));
auto outcome = client_->GetObject(request);
VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to get S3 object", bucket_, key_);

// Increment the get object metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3ListObjectsCalls);
}

Aws::S3::S3Client* client_;
Expand Down Expand Up @@ -271,6 +283,10 @@ class S3WriteFile::Impl {
}

fileSize_ = 0;

// Increment the started uploads metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3StartedUploads);
}

// Appends data to the end of the file.
Expand Down Expand Up @@ -315,6 +331,10 @@ class S3WriteFile::Impl {
outcome, "Failed to complete multiple part upload", bucket_, key_);
}
currentPart_->clear();

// Increment the successful uploads metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3SuccessfulUploads);
}

// Current file size, i.e. the sum of all previous appends.
Expand Down Expand Up @@ -576,11 +596,19 @@ class S3FileSystem::Impl {
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
hiveConfig_->s3UseVirtualAddressing());
++fileSystemCount;

// Increment the active connections metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3ActiveConnections);
}

~Impl() {
client_.reset();
--fileSystemCount;

// Decrement the active connections metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3ActiveConnections);
}

// Configure and return an AWSCredentialsProvider with access key and secret
Expand Down
Loading

0 comments on commit d72c7fe

Please sign in to comment.