Skip to content

Commit

Permalink
Enhance S3FileSystem with S3 Metrics Collection
Browse files Browse the repository at this point in the history
  • Loading branch information
athmaja-n committed Sep 26, 2024
1 parent db8875c commit 5c5774c
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 10 deletions.
74 changes: 73 additions & 1 deletion velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,29 @@ PeriodicStatsReporter::PeriodicStatsReporter(const Options& options)
cache_(options.cache),
arbitrator_(options.arbitrator),
spillMemoryPool_(options.spillMemoryPool),
options_(options) {}
options_(options),
lastCacheStats_(),
#ifdef VELOX_ENABLE_S3
s3FileSystem_(options.s3FileSystem) // Initialize s3FileSystem_ last
#endif
{
#ifdef VELOX_ENABLE_S3
VELOX_CHECK(
s3FileSystem_ || options.s3MetricsIntervalMs == 0,
"S3FileSystem must be provided when VELOX_ENABLE_S3 is enabled and s3MetricsIntervalMs is greater than 0");
#endif
}

void PeriodicStatsReporter::start() {
LOG(INFO) << "Starting PeriodicStatsReporter with options "
<< options_.toString();

#ifdef VELOX_ENABLE_S3
if (s3FileSystem_ && options_.s3MetricsIntervalMs > 0) {
addS3MetricsTask(options_.s3MetricsIntervalMs);
}
#endif

addTask(
"report_allocator_stats",
[this]() { reportAllocatorStats(); },
Expand Down Expand Up @@ -256,4 +274,58 @@ void PeriodicStatsReporter::reportSpillStats() {
RECORD_METRIC_VALUE(kMetricSpillPeakMemoryBytes, spillMemoryStats.peakBytes);
}

void PeriodicStatsReporter::addS3MetricsTask(uint64_t intervalMs) {
addTask(
"report_s3_metrics",
[this]() {
auto& s3Metrics = s3FileSystem_->getMetrics();

// Log and record metrics
LOG(INFO) << "Updating S3 metrics: "
<< "ActiveConnections=" << s3Metrics.activeConnections << ", "
<< "StartedUploads=" << s3Metrics.startedUploads << ", "
<< "FailedUploads=" << s3Metrics.failedUploads << ", "
<< "SuccessfulUploads=" << s3Metrics.successfulUploads;

// Record COUNT metrics.
RECORD_METRIC_VALUE(
filesystems::kMetricS3ActiveConnections, s3Metrics.activeConnections);
RECORD_METRIC_VALUE(filesystems::kMetricS3MetadataCalls, s3Metrics.metadataCalls);
RECORD_METRIC_VALUE(
filesystems::kMetricS3ListStatusCalls, s3Metrics.listStatusCalls);
RECORD_METRIC_VALUE(
filesystems::kMetricS3ListLocatedStatusCalls, s3Metrics.listLocatedStatusCalls);
RECORD_METRIC_VALUE(
filesystems::kMetricS3ListObjectsCalls, s3Metrics.listObjectsCalls);
RECORD_METRIC_VALUE(
filesystems::kMetricS3OtherReadErrors, s3Metrics.otherReadErrors);
RECORD_METRIC_VALUE(
filesystems::kMetricS3AwsAbortedExceptions, s3Metrics.awsAbortedExceptions);
RECORD_METRIC_VALUE(
filesystems::kMetricS3SocketExceptions, s3Metrics.socketExceptions);
RECORD_METRIC_VALUE(
filesystems::kMetricS3GetObjectErrors, s3Metrics.getObjectErrors);
RECORD_METRIC_VALUE(
filesystems::kMetricS3GetMetadataErrors, s3Metrics.getMetadataErrors);
RECORD_METRIC_VALUE(
filesystems::kMetricS3GetObjectRetries, s3Metrics.getObjectRetries);
RECORD_METRIC_VALUE(
filesystems::kMetricS3GetMetadataRetries, s3Metrics.getMetadataRetries);
RECORD_METRIC_VALUE(filesystems::kMetricS3ReadRetries, s3Metrics.readRetries);

// Record SUM metrics using delta values.
RECORD_METRIC_VALUE(
filesystems::kMetricS3StartedUploads, s3Metrics.getDelta("startedUploads"));
RECORD_METRIC_VALUE(
filesystems::kMetricS3FailedUploads, s3Metrics.getDelta("failedUploads"));
RECORD_METRIC_VALUE(
filesystems::kMetricS3SuccessfulUploads,
s3Metrics.getDelta("successfulUploads"));

// Reset deltas after reporting.
s3Metrics.resetDeltas();
},
intervalMs);
}

} // namespace facebook::velox
17 changes: 17 additions & 0 deletions velox/common/base/PeriodicStatsReporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
#pragma once

#include <folly/experimental/ThreadedRepeatingFunctionRunner.h>
#include <memory>
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/SsdFile.h"
#include "velox/common/memory/MemoryArbitrator.h"

#ifdef VELOX_ENABLE_S3
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"
#endif

namespace folly {
class CPUThreadPoolExecutor;
}
Expand Down Expand Up @@ -53,6 +59,10 @@ class PeriodicStatsReporter {
const memory::MemoryPool* spillMemoryPool{nullptr};
uint64_t spillStatsIntervalMs{60'000};

#ifdef VELOX_ENABLE_S3
uint64_t s3MetricsIntervalMs{60'000};
#endif

std::string toString() const {
return fmt::format(
"allocatorStatsIntervalMs:{}, cacheStatsIntervalMs:{}, "
Expand Down Expand Up @@ -95,6 +105,8 @@ class PeriodicStatsReporter {
void reportAllocatorStats();
void reportArbitratorStats();
void reportSpillStats();
// Method for adding the S3 metrics aggregator task.
void addS3MetricsAggregatorTask(uint64_t intervalMs);

const velox::memory::MemoryAllocator* const allocator_{nullptr};
const velox::cache::AsyncDataCache* const cache_{nullptr};
Expand All @@ -105,6 +117,11 @@ class PeriodicStatsReporter {
cache::CacheStats lastCacheStats_;

folly::ThreadedRepeatingFunctionRunner scheduler_;

// Add the s3FileSystem_ member here.
#ifdef VELOX_ENABLE_S3
std::shared_ptr<filesystems::S3FileSystem> s3FileSystem_;
#endif
};

/// Initializes and starts the process-wide periodic stats reporter. Before
Expand Down
42 changes: 34 additions & 8 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/

#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h"
#include "velox/core/QueryConfig.h"
Expand Down Expand Up @@ -102,6 +104,9 @@ class S3ReadFile final : public ReadFile {
outcome, "Failed to get metadata for S3 object", bucket_, key_);
length_ = outcome.GetResult().GetContentLength();
VELOX_CHECK_GE(length_, 0);

// Increment the metadata call metric
impl_->getMetrics().increment("metadataCalls");
}

std::string_view pread(uint64_t offset, uint64_t length, void* buffer)
Expand Down Expand Up @@ -182,6 +187,8 @@ class S3ReadFile final : public ReadFile {
AwsWriteableStreamFactory(position, length));
auto outcome = client_->GetObject(request);
VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to get S3 object", bucket_, key_);

impl_->getMetrics().increment("listObjectsCalls");
}

Aws::S3::S3Client* client_;
Expand Down Expand Up @@ -271,6 +278,8 @@ class S3WriteFile::Impl {
}

fileSize_ = 0;

impl_->getMetrics().increment("startedUploads");
}

// Appends data to the end of the file.
Expand Down Expand Up @@ -315,6 +324,8 @@ class S3WriteFile::Impl {
outcome, "Failed to complete multiple part upload", bucket_, key_);
}
currentPart_->clear();

impl_->getMetrics().increment("successfulUploads");
}

// Current file size, i.e. the sum of all previous appends.
Expand Down Expand Up @@ -520,12 +531,13 @@ void finalizeS3() {

class S3FileSystem::Impl {
public:
Impl(const config::ConfigBase* config) {
hiveConfig_ = std::make_shared<HiveConfig>(
std::make_shared<config::ConfigBase>(config->rawConfigsCopy()));
VELOX_CHECK(getAwsInstance()->isInitialized(), "S3 is not initialized");
Aws::Client::ClientConfiguration clientConfig;
clientConfig.endpointOverride = hiveConfig_->s3Endpoint();
Impl(const config::ConfigBase* config)
: client_(), metrics_() { // Initialize metrics_ here
hiveConfig_ = std::make_shared<HiveConfig>(
std::make_shared<config::ConfigBase>(config->rawConfigsCopy()));
VELOX_CHECK(getAwsInstance()->isInitialized(), "S3 is not initialized");
Aws::Client::ClientConfiguration clientConfig;
clientConfig.endpointOverride = hiveConfig_->s3Endpoint();

if (hiveConfig_->s3UseProxyFromEnv()) {
auto proxyConfig = S3ProxyConfigurationBuilder(hiveConfig_->s3Endpoint())
Expand Down Expand Up @@ -580,12 +592,18 @@ class S3FileSystem::Impl {
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
hiveConfig_->s3UseVirtualAddressing());
++fileSystemCount;
}

// Increment active connections metric.
metrics_.increment("activeConnections");
}

~Impl() {
client_.reset();
--fileSystemCount;
}

// decrement active connections metric.
metrics_.increment("activeConnections");
}

// Configure and return an AWSCredentialsProvider with access key and secret
// key.
Expand Down Expand Up @@ -716,6 +734,7 @@ class S3FileSystem::Impl {
private:
std::shared_ptr<HiveConfig> hiveConfig_;
std::shared_ptr<Aws::S3::S3Client> client_;
S3Metrics metrics_;
};

S3FileSystem::S3FileSystem(std::shared_ptr<const config::ConfigBase> config)
Expand All @@ -727,6 +746,13 @@ std::string S3FileSystem::getLogLevelName() const {
return impl_->getLogLevelName();
}

S3Metrics& S3FileSystem::getMetrics() {
return metrics_;
}
void S3FileSystem::resetMetricsDeltas() {
metrics_.resetDeltas(); // Reset SUM metric deltas after reporting.
}

std::unique_ptr<ReadFile> S3FileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
Expand Down
93 changes: 92 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,95 @@

#pragma once

#include <memory>
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConfig.h"

namespace facebook::velox::filesystems {
using namespace facebook::velox::connector::hive;

// Struct to hold S3-related metrics with delta tracking.
// COUNT metrics track current state; SUM metrics track deltas for one-minute count reporting.
struct S3Metrics {
// COUNT metric: Tracks the current number of active S3 connections.
uint64_t activeConnections{0};

// SUM metrics with delta tracking for one-minute count reporting.
uint64_t startedUploads{0}, prevStartedUploads{0};
uint64_t failedUploads{0}, prevFailedUploads{0};
uint64_t successfulUploads{0}, prevSuccessfulUploads{0};

// COUNT metrics track call or error occurrences.
uint64_t metadataCalls{0};
uint64_t listStatusCalls{0};
uint64_t listLocatedStatusCalls{0};
uint64_t listObjectsCalls{0};
uint64_t otherReadErrors{0};
uint64_t awsAbortedExceptions{0};
uint64_t socketExceptions{0};
uint64_t getObjectErrors{0};
uint64_t getMetadataErrors{0};
uint64_t getObjectRetries{0};
uint64_t getMetadataRetries{0};
uint64_t readRetries{0};

// Increment a metric.
void increment(const std::string& metricName) {
if (metricName == "activeConnections") {
++activeConnections;
} else if (metricName == "startedUploads") {
++startedUploads;
} else if (metricName == "failedUploads") {
++failedUploads;
} else if (metricName == "successfulUploads") {
++successfulUploads;
} else if (metricName == "metadataCalls") {
++metadataCalls;
} else if (metricName == "listStatusCalls") {
++listStatusCalls;
} else if (metricName == "listLocatedStatusCalls") {
++listLocatedStatusCalls;
} else if (metricName == "listObjectsCalls") {
++listObjectsCalls;
} else if (metricName == "otherReadErrors") {
++otherReadErrors;
} else if (metricName == "awsAbortedExceptions") {
++awsAbortedExceptions;
} else if (metricName == "socketExceptions") {
++socketExceptions;
} else if (metricName == "getObjectErrors") {
++getObjectErrors;
} else if (metricName == "getMetadataErrors") {
++getMetadataErrors;
} else if (metricName == "getObjectRetries") {
++getObjectRetries;
} else if (metricName == "getMetadataRetries") {
++getMetadataRetries;
} else if (metricName == "readRetries") {
++readRetries;
}
}

// Get the delta value for SUM-type metrics.
uint64_t getDelta(const std::string& metricName) {
if (metricName == "startedUploads") {
return startedUploads - prevStartedUploads;
} else if (metricName == "failedUploads") {
return failedUploads - prevFailedUploads;
} else if (metricName == "successfulUploads") {
return successfulUploads - prevSuccessfulUploads;
}
return 0; // COUNT metrics do not track deltas.
}

// Reset previous SUM metric values to current values for delta tracking.
void resetDeltas() {
prevStartedUploads = startedUploads;
prevFailedUploads = failedUploads;
prevSuccessfulUploads = successfulUploads;
}
};

bool initializeS3(const config::ConfigBase* config);

void finalizeS3();
Expand Down Expand Up @@ -72,9 +155,17 @@ class S3FileSystem : public FileSystem {

std::string getLogLevelName() const;

protected:
// Access the metrics struct.
S3Metrics& getMetrics();

// Reset the delta values for SUM-type metrics.
void resetMetricsDeltas();
S3Metrics metrics_; // Struct for tracking S3 metrics.

protected:
class Impl;
std::shared_ptr<Impl> impl_;

};

} // namespace facebook::velox::filesystems
Loading

0 comments on commit 5c5774c

Please sign in to comment.