diff --git a/velox/common/base/CMakeLists.txt b/velox/common/base/CMakeLists.txt index d70afb59d639..7645c07c4465 100644 --- a/velox/common/base/CMakeLists.txt +++ b/velox/common/base/CMakeLists.txt @@ -34,7 +34,9 @@ add_library( target_link_libraries( velox_common_base - PUBLIC velox_exception Folly::folly fmt::fmt xsimd + #PUBLIC velox_exception Folly::folly fmt::fmt xsimd + #PRIVATE velox_common_compression velox_process velox_test_util glog::glog) + PUBLIC velox_exception velox_s3fs Folly::folly fmt::fmt xsimd PRIVATE velox_common_compression velox_process velox_test_util glog::glog) if(${VELOX_BUILD_TESTING}) diff --git a/velox/common/base/PeriodicStatsReporter.cpp b/velox/common/base/PeriodicStatsReporter.cpp index 5bd8781a6b08..5b73d259f787 100644 --- a/velox/common/base/PeriodicStatsReporter.cpp +++ b/velox/common/base/PeriodicStatsReporter.cpp @@ -84,6 +84,11 @@ void PeriodicStatsReporter::start() { "report_spill_stats", [this]() { reportSpillStats(); }, options_.spillStatsIntervalMs); +// Add S3 metrics task + addTask( + "report_s3_metrics", + [this]() { reportS3Metrics(); }, + 2000); // every 2 seconds } void PeriodicStatsReporter::stop() { @@ -256,4 +261,30 @@ void PeriodicStatsReporter::reportSpillStats() { RECORD_METRIC_VALUE(kMetricSpillPeakMemoryBytes, spillMemoryStats.peakBytes); } +void PeriodicStatsReporter::reportS3Metrics() { + auto aggregator = facebook::velox::filesystems::S3MetricsAggregator::getInstance(); + LOG(INFO) << "Updating S3 metrics: " + << "ActiveConnections=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3ActiveConnections) << ", " + << "StartedUploads=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3StartedUploads) << ", " + << "FailedUploads=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3FailedUploads) << ", " + << "SuccessfulUploads=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3SuccessfulUploads); + + // Record the metrics + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ActiveConnections, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ActiveConnections)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3StartedUploads, aggregator->getMetric(facebook::velox::filesystems::kMetricS3StartedUploads)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3FailedUploads, aggregator->getMetric(facebook::velox::filesystems::kMetricS3FailedUploads)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3SuccessfulUploads, aggregator->getMetric(facebook::velox::filesystems::kMetricS3SuccessfulUploads)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3MetadataCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3MetadataCalls)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ListStatusCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ListStatusCalls)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ListLocatedStatusCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ListLocatedStatusCalls)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ListObjectsCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ListObjectsCalls)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3OtherReadErrors, aggregator->getMetric(facebook::velox::filesystems::kMetricS3OtherReadErrors)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3AwsAbortedExceptions, aggregator->getMetric(facebook::velox::filesystems::kMetricS3AwsAbortedExceptions)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3SocketExceptions, aggregator->getMetric(facebook::velox::filesystems::kMetricS3SocketExceptions)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetObjectErrors, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetObjectErrors)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetMetadataErrors, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetMetadataErrors)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetObjectRetries, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetObjectRetries)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetMetadataRetries, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetMetadataRetries)); + RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ReadRetries, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ReadRetries)); +} } // namespace facebook::velox diff --git a/velox/common/base/PeriodicStatsReporter.h b/velox/common/base/PeriodicStatsReporter.h index a902f3de8222..0b1e9b7b916b 100644 --- a/velox/common/base/PeriodicStatsReporter.h +++ b/velox/common/base/PeriodicStatsReporter.h @@ -20,6 +20,8 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/caching/SsdFile.h" #include "velox/common/memory/MemoryArbitrator.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h" +#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h" namespace folly { class CPUThreadPoolExecutor; @@ -95,6 +97,8 @@ class PeriodicStatsReporter { void reportAllocatorStats(); void reportArbitratorStats(); void reportSpillStats(); + // Method for reporting S3 metrics + void reportS3Metrics(); const velox::memory::MemoryAllocator* const allocator_{nullptr}; const velox::cache::AsyncDataCache* const cache_{nullptr};