Skip to content

Commit

Permalink
Add S3 metrics collection and reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
athmaja-n committed Aug 13, 2024
1 parent 01624d9 commit 34befee
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
4 changes: 3 additions & 1 deletion velox/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ add_library(

target_link_libraries(
velox_common_base
PUBLIC velox_exception Folly::folly fmt::fmt xsimd
#PUBLIC velox_exception Folly::folly fmt::fmt xsimd
#PRIVATE velox_common_compression velox_process velox_test_util glog::glog)
PUBLIC velox_exception velox_s3fs Folly::folly fmt::fmt xsimd
PRIVATE velox_common_compression velox_process velox_test_util glog::glog)

if(${VELOX_BUILD_TESTING})
Expand Down
31 changes: 31 additions & 0 deletions velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ void PeriodicStatsReporter::start() {
"report_spill_stats",
[this]() { reportSpillStats(); },
options_.spillStatsIntervalMs);
// Add S3 metrics task
addTask(
"report_s3_metrics",
[this]() { reportS3Metrics(); },
2000); // every 2 seconds
}

void PeriodicStatsReporter::stop() {
Expand Down Expand Up @@ -256,4 +261,30 @@ void PeriodicStatsReporter::reportSpillStats() {
RECORD_METRIC_VALUE(kMetricSpillPeakMemoryBytes, spillMemoryStats.peakBytes);
}

void PeriodicStatsReporter::reportS3Metrics() {
auto aggregator = facebook::velox::filesystems::S3MetricsAggregator::getInstance();
LOG(INFO) << "Updating S3 metrics: "
<< "ActiveConnections=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3ActiveConnections) << ", "
<< "StartedUploads=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3StartedUploads) << ", "
<< "FailedUploads=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3FailedUploads) << ", "
<< "SuccessfulUploads=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3SuccessfulUploads);

// Record the metrics
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ActiveConnections, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ActiveConnections));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3StartedUploads, aggregator->getMetric(facebook::velox::filesystems::kMetricS3StartedUploads));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3FailedUploads, aggregator->getMetric(facebook::velox::filesystems::kMetricS3FailedUploads));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3SuccessfulUploads, aggregator->getMetric(facebook::velox::filesystems::kMetricS3SuccessfulUploads));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3MetadataCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3MetadataCalls));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ListStatusCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ListStatusCalls));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ListLocatedStatusCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ListLocatedStatusCalls));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ListObjectsCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ListObjectsCalls));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3OtherReadErrors, aggregator->getMetric(facebook::velox::filesystems::kMetricS3OtherReadErrors));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3AwsAbortedExceptions, aggregator->getMetric(facebook::velox::filesystems::kMetricS3AwsAbortedExceptions));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3SocketExceptions, aggregator->getMetric(facebook::velox::filesystems::kMetricS3SocketExceptions));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetObjectErrors, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetObjectErrors));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetMetadataErrors, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetMetadataErrors));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetObjectRetries, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetObjectRetries));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetMetadataRetries, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetMetadataRetries));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ReadRetries, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ReadRetries));
}
} // namespace facebook::velox
4 changes: 4 additions & 0 deletions velox/common/base/PeriodicStatsReporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/SsdFile.h"
#include "velox/common/memory/MemoryArbitrator.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"

namespace folly {
class CPUThreadPoolExecutor;
Expand Down Expand Up @@ -95,6 +97,8 @@ class PeriodicStatsReporter {
void reportAllocatorStats();
void reportArbitratorStats();
void reportSpillStats();
// Method for reporting S3 metrics
void reportS3Metrics();

const velox::memory::MemoryAllocator* const allocator_{nullptr};
const velox::cache::AsyncDataCache* const cache_{nullptr};
Expand Down

0 comments on commit 34befee

Please sign in to comment.