Skip to content

Commit

Permalink
Add option for admission control for filesystem resources
Browse files Browse the repository at this point in the history
Summary:
This change adds a generic admission controller class that can be
used for filesystem resources like read bytes in flight or number
of read requests in flight. It also provides a way to report stats
for resource usage, queued count, queued wait times by allowing the
client to specify a metric name.

Differential Revision: D63728285
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Oct 1, 2024
1 parent bfb8ebe commit 07da8d5
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 0 deletions.
1 change: 1 addition & 0 deletions velox/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ velox_add_library(
BitUtil.cpp
Counters.cpp
Fs.cpp
GenericAdmissionController.cpp
PeriodicStatsReporter.cpp
RandomUtil.cpp
RawVector.cpp
Expand Down
92 changes: 92 additions & 0 deletions velox/common/base/GenericAdmissionController.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/base/GenericAdmissionController.h"

#include "velox/common/base/Exceptions.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/time/Timer.h"

namespace facebook::velox::common {

void GenericAdmissionController::accept(uint64_t resourceUnits) {
ContinueFuture future;
uint64_t updatedValue = 0;
VELOX_CHECK_LE(
resourceUnits,
config_.maxLimit,
"A single request cannot exceed the max limit");
{
std::lock_guard<std::mutex> l(mtx_);
if (unitsUsed_ + resourceUnits > config_.maxLimit) {
auto [unblockPromise, unblockFuture] = makeVeloxContinuePromiseContract();
Request req;
req.unitsRequested = resourceUnits;
req.promise = std::move(unblockPromise);
queue_.push_back(std::move(req));
future = std::move(unblockFuture);
} else {
updatedValue = unitsUsed_ += resourceUnits;
}
}
if (future.valid()) {
if (!config_.resourceQueuedCountMetric.empty()) {
RECORD_METRIC_VALUE(config_.resourceQueuedCountMetric);
}
uint64_t waitTimeUs{0};
{
MicrosecondTimer timer(&waitTimeUs);
future.wait();
}
if (!config_.resourceQueuedTimeMsMetric.empty()) {
RECORD_HISTOGRAM_METRIC_VALUE(
config_.resourceQueuedTimeMsMetric, waitTimeUs / 1'000);
}
return;
}
// Only upadate if there was no wait, as the releasing thread is responsible
// for updating the metric.
if (!config_.resourceUsageMetric.empty()) {
RECORD_METRIC_VALUE(config_.resourceUsageMetric, updatedValue);
}
}

void GenericAdmissionController::release(uint64_t resourceUnits) {
uint64_t updatedValue = 0;
{
std::lock_guard<std::mutex> l(mtx_);
if (resourceUnits > unitsUsed_) {
LOG(WARNING) << "Releasing more units than have been acquired"
" (resourceUnits > unitsUsed_): "
<< resourceUnits << " > " << unitsUsed_;
}
unitsUsed_ -= resourceUnits;
while (!queue_.empty()) {
auto& request = queue_.front();
if (unitsUsed_ + request.unitsRequested > config_.maxLimit) {
break;
}
unitsUsed_ += request.unitsRequested;
request.promise.setValue();
queue_.pop_front();
}
updatedValue = unitsUsed_;
}
if (!config_.resourceUsageMetric.empty()) {
RECORD_METRIC_VALUE(config_.resourceUsageMetric, updatedValue);
}
}
} // namespace facebook::velox::common
63 changes: 63 additions & 0 deletions velox/common/base/GenericAdmissionController.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <deque>
#include <mutex>
#include "velox/common/future/VeloxPromise.h"

namespace facebook::velox::common {

/// A generic admission controller that can be used to limit the number of
/// resources in use and can log metrics like resource usage, queued count,
/// queued wait times. When a calling thread's request for resources surpasses
/// the set limit, it will be placed in a FIFO queue. The thread must then wait
/// until sufficient resources are freed by other threads, addressing all
/// preceding requests in the queue, before its own request can be granted.
class GenericAdmissionController {
public:
struct Config {
uint64_t maxLimit;
/// The metric name for resource usage. If not set, it will not be reported.
std::string resourceUsageMetric;
/// The metric name for resource queued count. If not set, it will not be
/// reported
std::string resourceQueuedCountMetric;
/// The metric name for resource queued wait time. If not set, it will not
/// be reported
std::string resourceQueuedTimeMsMetric;
};
explicit GenericAdmissionController(Config config) : config_(config) {}

void accept(uint64_t resourceUnits);
void release(uint64_t resourceUnits);

uint64_t currentResourceUsage() const {
std::lock_guard<std::mutex> l(mtx_);
return unitsUsed_;
}

private:
struct Request {
uint64_t unitsRequested;
ContinuePromise promise;
};
Config config_;
mutable std::mutex mtx_;
uint64_t unitsUsed_{0};
std::deque<Request> queue_;
};
} // namespace facebook::velox::common
1 change: 1 addition & 0 deletions velox/common/base/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ add_executable(
ConcurrentCounterTest.cpp
ExceptionTest.cpp
FsTest.cpp
GenericAdmissionControllerTest.cpp
RangeTest.cpp
RawVectorTest.cpp
RuntimeMetricsTest.cpp
Expand Down
75 changes: 75 additions & 0 deletions velox/common/base/tests/GenericAdmissionControllerTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/base/GenericAdmissionController.h"
#include <gtest/gtest.h>
#include <atomic>
#include "velox/common/base/VeloxException.h"
#include "velox/common/base/tests/GTestUtils.h"

using namespace facebook::velox;
namespace facebook::velox::common {
TEST(GenericAdmissionController, basic) {
const uint64_t kLimit = 100000;
GenericAdmissionController::Config config;
config.maxLimit = kLimit;
GenericAdmissionController admissionController(config);
EXPECT_EQ(admissionController.currentResourceUsage(), 0);

admissionController.accept(100);
EXPECT_EQ(admissionController.currentResourceUsage(), 100);

admissionController.accept(100);
EXPECT_EQ(admissionController.currentResourceUsage(), 200);

admissionController.release(100);
EXPECT_EQ(admissionController.currentResourceUsage(), 100);

VELOX_ASSERT_THROW(
admissionController.release(101),
"Cannot release more units than have been acquired");

VELOX_ASSERT_THROW(
admissionController.accept(kLimit + 1),
"A single request cannot exceed the max limit");
}

TEST(GenericAdmissionController, multiThreaded) {
// Ensure that resource usage never exceeds the limit set in the admission
// controller.
const uint64_t kLimit = 10;
std::atomic_uint64_t currentUsage{0};
GenericAdmissionController::Config config;
config.maxLimit = kLimit;
GenericAdmissionController admissionController(config);

std::vector<std::thread> threads;
for (int i = 0; i < 20; i++) {
threads.push_back(std::thread([&]() {
for (int j = 0; j < 1000; j++) {
admissionController.accept(1);
uint64_t curr = currentUsage.fetch_add(1);
ASSERT_LE(curr + 1, kLimit);
currentUsage--;
admissionController.release(1);
}
}));
}
for (auto& thread : threads) {
thread.join();
}
}
} // namespace facebook::velox::common

0 comments on commit 07da8d5

Please sign in to comment.