Skip to content

Commit

Permalink
Add S3 metrics collection and reporting.
Browse files Browse the repository at this point in the history
  • Loading branch information
athmaja-n committed Aug 14, 2024
1 parent 2518463 commit 97c61e8
Show file tree
Hide file tree
Showing 10 changed files with 348 additions and 2 deletions.
3 changes: 2 additions & 1 deletion velox/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ add_library(
StatsReporter.cpp
SuccinctPrinter.cpp)

# Link the newly created velox_s3_metrics_aggregator library
target_link_libraries(
velox_common_base
PUBLIC velox_exception Folly::folly fmt::fmt xsimd
PRIVATE velox_common_compression velox_process velox_test_util glog::glog)
PRIVATE velox_common_compression velox_process velox_test_util glog::glog velox_s3_metrics_aggregator)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
31 changes: 31 additions & 0 deletions velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ void PeriodicStatsReporter::start() {
"report_spill_stats",
[this]() { reportSpillStats(); },
options_.spillStatsIntervalMs);
// Add S3 metrics task
addTask(
"report_s3_metrics",
[this]() { reportS3Metrics(); },
2000); // every 2 seconds
}

void PeriodicStatsReporter::stop() {
Expand Down Expand Up @@ -256,4 +261,30 @@ void PeriodicStatsReporter::reportSpillStats() {
RECORD_METRIC_VALUE(kMetricSpillPeakMemoryBytes, spillMemoryStats.peakBytes);
}

void PeriodicStatsReporter::reportS3Metrics() {
auto aggregator = facebook::velox::filesystems::S3MetricsAggregator::getInstance();
LOG(INFO) << "Updating S3 metrics: "
<< "ActiveConnections=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3ActiveConnections) << ", "
<< "StartedUploads=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3StartedUploads) << ", "
<< "FailedUploads=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3FailedUploads) << ", "
<< "SuccessfulUploads=" << aggregator->getMetric(facebook::velox::filesystems::kMetricS3SuccessfulUploads);

// Record the metrics
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ActiveConnections, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ActiveConnections));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3StartedUploads, aggregator->getMetric(facebook::velox::filesystems::kMetricS3StartedUploads));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3FailedUploads, aggregator->getMetric(facebook::velox::filesystems::kMetricS3FailedUploads));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3SuccessfulUploads, aggregator->getMetric(facebook::velox::filesystems::kMetricS3SuccessfulUploads));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3MetadataCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3MetadataCalls));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ListStatusCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ListStatusCalls));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ListLocatedStatusCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ListLocatedStatusCalls));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ListObjectsCalls, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ListObjectsCalls));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3OtherReadErrors, aggregator->getMetric(facebook::velox::filesystems::kMetricS3OtherReadErrors));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3AwsAbortedExceptions, aggregator->getMetric(facebook::velox::filesystems::kMetricS3AwsAbortedExceptions));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3SocketExceptions, aggregator->getMetric(facebook::velox::filesystems::kMetricS3SocketExceptions));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetObjectErrors, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetObjectErrors));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetMetadataErrors, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetMetadataErrors));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetObjectRetries, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetObjectRetries));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3GetMetadataRetries, aggregator->getMetric(facebook::velox::filesystems::kMetricS3GetMetadataRetries));
RECORD_METRIC_VALUE(facebook::velox::filesystems::kMetricS3ReadRetries, aggregator->getMetric(facebook::velox::filesystems::kMetricS3ReadRetries));
}
} // namespace facebook::velox
4 changes: 4 additions & 0 deletions velox/common/base/PeriodicStatsReporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/SsdFile.h"
#include "velox/common/memory/MemoryArbitrator.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"

namespace folly {
class CPUThreadPoolExecutor;
Expand Down Expand Up @@ -95,6 +97,8 @@ class PeriodicStatsReporter {
void reportAllocatorStats();
void reportArbitratorStats();
void reportSpillStats();
// Method for reporting S3 metrics
void reportS3Metrics();

const velox::memory::MemoryAllocator* const allocator_{nullptr};
const velox::cache::AsyncDataCache* const cache_{nullptr};
Expand Down
15 changes: 14 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,26 @@

# for generated headers

# Create a new library for S3MetricsAggregator to avoid cyclic dependency
add_library(velox_s3_metrics_aggregator STATIC
S3MetricsAggregator.cpp
)

target_include_directories(velox_s3_metrics_aggregator PUBLIC ${AWSSDK_INCLUDE_DIRS})

target_link_libraries(velox_s3_metrics_aggregator
PUBLIC Folly::folly
PRIVATE glog::glog
)

add_library(velox_s3fs RegisterS3FileSystem.cpp)
if(VELOX_ENABLE_S3)
target_sources(velox_s3fs PRIVATE S3FileSystem.cpp S3Util.cpp)

target_include_directories(velox_s3fs PUBLIC ${AWSSDK_INCLUDE_DIRS})

target_link_libraries(velox_s3fs velox_dwio_common Folly::folly
${AWSSDK_LIBRARIES})
${AWSSDK_LIBRARIES} velox_s3_metrics_aggregator)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
28 changes: 28 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
#include "velox/core/QueryConfig.h"
#include "velox/dwio/common/DataBuffer.h"

// Include the S3 metrics headers
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"

#include <fmt/format.h>
#include <glog/logging.h>
#include <memory>
Expand Down Expand Up @@ -102,6 +106,10 @@ class S3ReadFile final : public ReadFile {
outcome, "Failed to get metadata for S3 object", bucket_, key_);
length_ = outcome.GetResult().GetContentLength();
VELOX_CHECK_GE(length_, 0);

// Increment the metadata call metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3MetadataCalls);
}

std::string_view pread(uint64_t offset, uint64_t length, void* buffer)
Expand Down Expand Up @@ -182,6 +190,10 @@ class S3ReadFile final : public ReadFile {
AwsWriteableStreamFactory(position, length));
auto outcome = client_->GetObject(request);
VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to get S3 object", bucket_, key_);

// Increment the get object metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3ListObjectsCalls);
}

Aws::S3::S3Client* client_;
Expand Down Expand Up @@ -271,6 +283,10 @@ class S3WriteFile::Impl {
}

fileSize_ = 0;

// Increment the started uploads metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3StartedUploads);
}

// Appends data to the end of the file.
Expand Down Expand Up @@ -315,6 +331,10 @@ class S3WriteFile::Impl {
outcome, "Failed to complete multiple part upload", bucket_, key_);
}
currentPart_->clear();

// Increment the successful uploads metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3SuccessfulUploads);
}

// Current file size, i.e. the sum of all previous appends.
Expand Down Expand Up @@ -576,11 +596,19 @@ class S3FileSystem::Impl {
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
hiveConfig_->s3UseVirtualAddressing());
++fileSystemCount;

// Increment the active connections metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3ActiveConnections);
}

~Impl() {
client_.reset();
--fileSystemCount;

// Decrement the active connections metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3ActiveConnections);
}

// Configure and return an AWSCredentialsProvider with access key and secret
Expand Down
56 changes: 56 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

namespace facebook::velox::filesystems {

/// Metric names for S3 FileSystem.
/// These metrics are used for monitoring and reporting various S3 operations.
constexpr auto kMetricS3ActiveConnections =
"presto_hive_s3_presto_s3_file_system_active_connections_total_count";
constexpr auto kMetricS3StartedUploads =
"presto_hive_s3_presto_s3_file_system_started_uploads_one_minute_count";
constexpr auto kMetricS3FailedUploads =
"presto_hive_s3_presto_s3_file_system_failed_uploads_one_minute_count";
constexpr auto kMetricS3SuccessfulUploads =
"presto_hive_s3_presto_s3_file_system_successful_uploads_one_minute_count";
constexpr auto kMetricS3MetadataCalls =
"presto_hive_s3_presto_s3_file_system_metadata_calls_one_minute_count";
constexpr auto kMetricS3ListStatusCalls =
"presto_hive_s3_presto_s3_file_system_list_status_calls_one_minute_count";
constexpr auto kMetricS3ListLocatedStatusCalls =
"presto_hive_s3_presto_s3_file_system_list_located_status_calls_one_minute_count";
constexpr auto kMetricS3ListObjectsCalls =
"presto_hive_s3_presto_s3_file_system_list_objects_calls_one_minute_count";
constexpr auto kMetricS3OtherReadErrors =
"presto_hive_s3_presto_s3_file_system_other_read_errors_one_minute_count";
constexpr auto kMetricS3AwsAbortedExceptions =
"presto_hive_s3_presto_s3_file_system_aws_aborted_exceptions_one_minute_count";
constexpr auto kMetricS3SocketExceptions =
"presto_hive_s3_presto_s3_file_system_socket_exceptions_one_minute_count";
constexpr auto kMetricS3GetObjectErrors =
"presto_hive_s3_presto_s3_file_system_get_object_errors_one_minute_count";
constexpr auto kMetricS3GetMetadataErrors =
"presto_hive_s3_presto_s3_file_system_get_metadata_errors_one_minute_count";
constexpr auto kMetricS3GetObjectRetries =
"presto_hive_s3_presto_s3_file_system_get_object_retries_one_minute_count";
constexpr auto kMetricS3GetMetadataRetries =
"presto_hive_s3_presto_s3_file_system_get_metadata_retries_one_minute_count";
constexpr auto kMetricS3ReadRetries =
"presto_hive_s3_presto_s3_file_system_read_retries_one_minute_count";

} // namespace facebook::velox::filesystems
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"
#include <glog/logging.h>
#include <folly/Singleton.h>

namespace facebook::velox::filesystems {

namespace {
folly::Singleton<S3MetricsAggregator> s3MetricsAggregatorSingleton;
} // namespace

std::shared_ptr<S3MetricsAggregator> S3MetricsAggregator::getInstance() {
return s3MetricsAggregatorSingleton.try_get();
}

S3MetricsAggregator::S3MetricsAggregator() {
std::lock_guard<std::mutex> guard(mutex_);
std::vector<std::string> metricNames = {
"presto_hive_s3_presto_s3_file_system_active_connections_total_count",
"presto_hive_s3_presto_s3_file_system_started_uploads_one_minute_count",
"presto_hive_s3_presto_s3_file_system_failed_uploads_one_minute_count",
"presto_hive_s3_presto_s3_file_system_successful_uploads_one_minute_count",
"presto_hive_s3_presto_s3_file_system_metadata_calls_one_minute_count",
"presto_hive_s3_presto_s3_file_system_list_status_calls_one_minute_count",
"presto_hive_s3_presto_s3_file_system_list_located_status_calls_one_minute_count",
"presto_hive_s3_presto_s3_file_system_list_objects_calls_one_minute_count",
"presto_hive_s3_presto_s3_file_system_other_read_errors_one_minute_count",
"presto_hive_s3_presto_s3_file_system_aws_aborted_exceptions_one_minute_count",
"presto_hive_s3_presto_s3_file_system_socket_exceptions_one_minute_count",
"presto_hive_s3_presto_s3_file_system_get_object_errors_one_minute_count",
"presto_hive_s3_presto_s3_file_system_get_metadata_errors_one_minute_count",
"presto_hive_s3_presto_s3_file_system_get_object_retries_one_minute_count",
"presto_hive_s3_presto_s3_file_system_get_metadata_retries_one_minute_count",
"presto_hive_s3_presto_s3_file_system_read_retries_one_minute_count"};

for (const auto& name : metricNames) {
metrics_[name] = 0;
}
}

// Increment the specified metric
void S3MetricsAggregator::incrementMetric(const std::string& metricName) {
std::lock_guard<std::mutex> guard(mutex_);
metrics_[metricName]++;
LOG(INFO) << "Incremented metric " << metricName << " to "
<< metrics_[metricName];
}

// Retrieve the current value of the specified metric
uint64_t S3MetricsAggregator::getMetric(const std::string& metricName) {
std::lock_guard<std::mutex> guard(mutex_);
return metrics_[metricName];
}

// Reset the specified metric
void S3MetricsAggregator::resetMetric(const std::string& metricName) {
std::lock_guard<std::mutex> guard(mutex_);
metrics_[metricName] = 0;
LOG(INFO) << "Reset metric " << metricName << " to 0";
}

} // namespace facebook::velox::filesystems
43 changes: 43 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef VELOX_CONNECTORS_HIVE_STORAGE_ADAPTERS_S3FS_S3METRICSAGGREGATOR_H_
#define VELOX_CONNECTORS_HIVE_STORAGE_ADAPTERS_S3FS_S3METRICSAGGREGATOR_H_

#include <folly/Singleton.h>
#include <mutex>
#include <unordered_map>

namespace facebook::velox::filesystems {

class S3MetricsAggregator {
public:
static std::shared_ptr<S3MetricsAggregator> getInstance();

void incrementMetric(const std::string& metricName);
uint64_t getMetric(const std::string& metricName);
void resetMetric(const std::string& metricName);

private:
S3MetricsAggregator();
std::unordered_map<std::string, uint64_t> metrics_;
std::mutex mutex_;
friend class folly::Singleton<S3MetricsAggregator>;
};

} // namespace facebook::velox::filesystems

#endif // VELOX_CONNECTORS_HIVE_STORAGE_ADAPTERS_S3FS_S3METRICSAGGREGATOR_H_
14 changes: 14 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,17 @@ target_link_libraries(
velox_exec
gtest
gtest_main)

add_executable(velox_s3metrics_aggregator_test S3MetricsAggregatorTest.cpp)
add_test(velox_s3metrics_aggregator_test velox_s3metrics_aggregator_test)
target_link_libraries(
velox_s3metrics_aggregator_test
velox_file
velox_s3fs
velox_hive_config
velox_core
velox_exec_test_lib
velox_dwio_common_exception
velox_exec
gtest
gtest_main)
Loading

0 comments on commit 97c61e8

Please sign in to comment.