Skip to content

Commit

Permalink
Add option for admission control for filesystem resources (facebookin…
Browse files Browse the repository at this point in the history
…cubator#11146)

Summary:

This change adds a generic admission controller class that can be
used for filesystem resources like read bytes in flight or number
of read requests in flight. It also provides a way to report stats
for resource usage, queued count, queued wait times by allowing the
client to specify a metric name.

Reviewed By: xiaoxmeng

Differential Revision: D63728285
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Oct 3, 2024
1 parent 583547f commit da62ee1
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 0 deletions.
91 changes: 91 additions & 0 deletions velox/common/base/AdmissionController.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/base/AdmissionController.h"

#include "velox/common/base/Exceptions.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/time/Timer.h"

namespace facebook::velox::common {

void AdmissionController::accept(uint64_t resourceUnits) {
ContinueFuture future;
uint64_t updatedValue = 0;
VELOX_CHECK_LE(
resourceUnits,
config_.maxLimit,
"A single request cannot exceed the max limit");
{
std::lock_guard<std::mutex> l(mu_);
if (unitsUsed_ + resourceUnits > config_.maxLimit) {
auto [unblockPromise, unblockFuture] = makeVeloxContinuePromiseContract();
Request req;
req.unitsRequested = resourceUnits;
req.promise = std::move(unblockPromise);
queue_.push_back(std::move(req));
future = std::move(unblockFuture);
} else {
updatedValue = unitsUsed_ += resourceUnits;
}
}
if (!future.valid()) {
// Only upadate if there was no wait, as the releasing thread is responsible
// for updating the metric.
if (!config_.resourceUsageAvgMetric.empty()) {
RECORD_METRIC_VALUE(config_.resourceUsageAvgMetric, updatedValue);
}
return;
}
if (!config_.resourceQueuedCountMetric.empty()) {
RECORD_METRIC_VALUE(config_.resourceQueuedCountMetric);
}
uint64_t waitTimeUs{0};
{
MicrosecondTimer timer(&waitTimeUs);
future.wait();
}
if (!config_.resourceQueuedTimeMsHistogramMetric.empty()) {
RECORD_HISTOGRAM_METRIC_VALUE(
config_.resourceQueuedTimeMsHistogramMetric, waitTimeUs / 1'000);
}
}

void AdmissionController::release(uint64_t resourceUnits) {
uint64_t updatedValue = 0;
{
std::lock_guard<std::mutex> l(mu_);
VELOX_CHECK_LE(
resourceUnits,
unitsUsed_,
"Cannot release more units than have been acquired");
unitsUsed_ -= resourceUnits;
while (!queue_.empty()) {
auto& request = queue_.front();
if (unitsUsed_ + request.unitsRequested > config_.maxLimit) {
break;
}
unitsUsed_ += request.unitsRequested;
request.promise.setValue();
queue_.pop_front();
}
updatedValue = unitsUsed_;
}
if (!config_.resourceUsageAvgMetric.empty()) {
RECORD_METRIC_VALUE(config_.resourceUsageAvgMetric, updatedValue);
}
}
} // namespace facebook::velox::common
67 changes: 67 additions & 0 deletions velox/common/base/AdmissionController.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <deque>
#include <mutex>
#include "velox/common/future/VeloxPromise.h"

namespace facebook::velox::common {

/// A generic admission controller that can be used to limit the number of
/// resources in use and can log metrics like resource usage, queued count,
/// queued wait times. When a calling thread's request for resources surpasses
/// the set limit, it will be placed in a FIFO queue. The thread must then wait
/// until sufficient resources are freed by other threads, addressing all
/// preceding requests in the queue, before its own request can be granted.
class AdmissionController {
public:
struct Config {
/// The maximum number of resource units that can be used at any given time.
/// Set to a default value of max unit64 to signify unlimited limit.
uint64_t maxLimit{std::numeric_limits<uint64_t>::max()};
/// The metric name for resource usage. If not set, it will not be reported.
/// Should be a registered as a average metric.
std::string resourceUsageAvgMetric;
/// The metric name for resource queued count. If not set, it will not be
/// reported. Should be a registered as a count metric.
std::string resourceQueuedCountMetric;
/// The metric name for resource queued wait time. If not set, it will not
/// be reported. Should be a registered as a histogram metric.
std::string resourceQueuedTimeMsHistogramMetric;
};
explicit AdmissionController(const Config& config) : config_(config) {}

// Accept can block until sufficient resources are freed by other threads.
void accept(uint64_t resourceUnits);
void release(uint64_t resourceUnits);

uint64_t currentResourceUsage() const {
std::lock_guard<std::mutex> l(mu_);
return unitsUsed_;
}

private:
struct Request {
uint64_t unitsRequested;
ContinuePromise promise;
};
Config config_;
mutable std::mutex mu_;
uint64_t unitsUsed_{0};
std::deque<Request> queue_;
};
} // namespace facebook::velox::common
1 change: 1 addition & 0 deletions velox/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ velox_link_libraries(

velox_add_library(
velox_common_base
AdmissionController.cpp
BitUtil.cpp
Counters.cpp
Fs.cpp
Expand Down
82 changes: 82 additions & 0 deletions velox/common/base/tests/AdmissionControllerTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/base/AdmissionController.h"
#include <gtest/gtest.h>
#include <atomic>
#include "velox/common/base/VeloxException.h"
#include "velox/common/base/tests/GTestUtils.h"

using namespace facebook::velox;
namespace facebook::velox::common {
TEST(AdmissionController, basic) {
const uint64_t kLimit = 100000;
AdmissionController::Config config;
config.maxLimit = kLimit;
AdmissionController admissionController(config);
EXPECT_EQ(admissionController.currentResourceUsage(), 0);

admissionController.accept(100);
EXPECT_EQ(admissionController.currentResourceUsage(), 100);

admissionController.accept(100);
EXPECT_EQ(admissionController.currentResourceUsage(), 200);

admissionController.release(100);
EXPECT_EQ(admissionController.currentResourceUsage(), 100);

VELOX_ASSERT_THROW(
admissionController.release(101),
"Cannot release more units than have been acquired");

VELOX_ASSERT_THROW(
admissionController.accept(kLimit + 1),
"A single request cannot exceed the max limit");
}

TEST(AdmissionController, multiThreaded) {
// Ensure that resource usage never exceeds the limit set in the admission
// controller.
const uint64_t kLimit = 30;
std::atomic_uint64_t currentUsage{0};
AdmissionController::Config config;
config.maxLimit = kLimit;
AdmissionController admissionController(config);

std::vector<std::thread> threads;
for (int i = 0; i < 20; i++) {
threads.push_back(std::thread([&]() {
for (int j = 0; j < 10'000; j++) {
uint64_t usageUnits = std::rand() % (kLimit + 2);
if (usageUnits > kLimit) {
VELOX_ASSERT_THROW(
admissionController.accept(usageUnits),
"A single request cannot exceed the max limit")
continue;
}
admissionController.accept(usageUnits);
uint64_t curr = currentUsage.fetch_add(usageUnits);
ASSERT_LE(curr + usageUnits, kLimit);
currentUsage.fetch_sub(usageUnits);
admissionController.release(usageUnits);
}
}));
}
for (auto& thread : threads) {
thread.join();
}
}
} // namespace facebook::velox::common
1 change: 1 addition & 0 deletions velox/common/base/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

add_executable(
velox_base_test
AdmissionControllerTest.cpp
AsyncSourceTest.cpp
BitUtilTest.cpp
BloomFilterTest.cpp
Expand Down

0 comments on commit da62ee1

Please sign in to comment.