diff --git a/velox/common/base/AdmissionController.cpp b/velox/common/base/AdmissionController.cpp new file mode 100644 index 000000000000..c7e1b71ea5ac --- /dev/null +++ b/velox/common/base/AdmissionController.cpp @@ -0,0 +1,91 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/AdmissionController.h" + +#include "velox/common/base/Exceptions.h" +#include "velox/common/base/StatsReporter.h" +#include "velox/common/time/Timer.h" + +namespace facebook::velox::common { + +void AdmissionController::accept(uint64_t resourceUnits) { + ContinueFuture future; + uint64_t updatedValue = 0; + VELOX_CHECK_LE( + resourceUnits, + config_.maxLimit, + "A single request cannot exceed the max limit"); + { + std::lock_guard l(mu_); + if (unitsUsed_ + resourceUnits > config_.maxLimit) { + auto [unblockPromise, unblockFuture] = makeVeloxContinuePromiseContract(); + Request req; + req.unitsRequested = resourceUnits; + req.promise = std::move(unblockPromise); + queue_.push_back(std::move(req)); + future = std::move(unblockFuture); + } else { + updatedValue = unitsUsed_ += resourceUnits; + } + } + if (!future.valid()) { + // Only upadate if there was no wait, as the releasing thread is responsible + // for updating the metric. + if (!config_.resourceUsageAvgMetric.empty()) { + RECORD_METRIC_VALUE(config_.resourceUsageAvgMetric, updatedValue); + } + return; + } + if (!config_.resourceQueuedCountMetric.empty()) { + RECORD_METRIC_VALUE(config_.resourceQueuedCountMetric); + } + uint64_t waitTimeUs{0}; + { + MicrosecondTimer timer(&waitTimeUs); + future.wait(); + } + if (!config_.resourceQueuedTimeMsHistogramMetric.empty()) { + RECORD_HISTOGRAM_METRIC_VALUE( + config_.resourceQueuedTimeMsHistogramMetric, waitTimeUs / 1'000); + } +} + +void AdmissionController::release(uint64_t resourceUnits) { + uint64_t updatedValue = 0; + { + std::lock_guard l(mu_); + VELOX_CHECK_LE( + resourceUnits, + unitsUsed_, + "Cannot release more units than have been acquired"); + unitsUsed_ -= resourceUnits; + while (!queue_.empty()) { + auto& request = queue_.front(); + if (unitsUsed_ + request.unitsRequested > config_.maxLimit) { + break; + } + unitsUsed_ += request.unitsRequested; + request.promise.setValue(); + queue_.pop_front(); + } + updatedValue = unitsUsed_; + } + if (!config_.resourceUsageAvgMetric.empty()) { + RECORD_METRIC_VALUE(config_.resourceUsageAvgMetric, updatedValue); + } +} +} // namespace facebook::velox::common diff --git a/velox/common/base/AdmissionController.h b/velox/common/base/AdmissionController.h new file mode 100644 index 000000000000..19945733698d --- /dev/null +++ b/velox/common/base/AdmissionController.h @@ -0,0 +1,67 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include "velox/common/future/VeloxPromise.h" + +namespace facebook::velox::common { + +/// A generic admission controller that can be used to limit the number of +/// resources in use and can log metrics like resource usage, queued count, +/// queued wait times. When a calling thread's request for resources surpasses +/// the set limit, it will be placed in a FIFO queue. The thread must then wait +/// until sufficient resources are freed by other threads, addressing all +/// preceding requests in the queue, before its own request can be granted. +class AdmissionController { + public: + struct Config { + /// The maximum number of resource units that can be used at any given time. + /// Set to a default value of max unit64 to signify unlimited limit. + uint64_t maxLimit{std::numeric_limits::max()}; + /// The metric name for resource usage. If not set, it will not be reported. + /// Should be a registered as a average metric. + std::string resourceUsageAvgMetric; + /// The metric name for resource queued count. If not set, it will not be + /// reported. Should be a registered as a count metric. + std::string resourceQueuedCountMetric; + /// The metric name for resource queued wait time. If not set, it will not + /// be reported. Should be a registered as a histogram metric. + std::string resourceQueuedTimeMsHistogramMetric; + }; + explicit AdmissionController(const Config& config) : config_(config) {} + + // Accept can block until sufficient resources are freed by other threads. + void accept(uint64_t resourceUnits); + void release(uint64_t resourceUnits); + + uint64_t currentResourceUsage() const { + std::lock_guard l(mu_); + return unitsUsed_; + } + + private: + struct Request { + uint64_t unitsRequested; + ContinuePromise promise; + }; + Config config_; + mutable std::mutex mu_; + uint64_t unitsUsed_{0}; + std::deque queue_; +}; +} // namespace facebook::velox::common diff --git a/velox/common/base/CMakeLists.txt b/velox/common/base/CMakeLists.txt index 6c26222ef56c..cfbfdc90ea66 100644 --- a/velox/common/base/CMakeLists.txt +++ b/velox/common/base/CMakeLists.txt @@ -25,6 +25,7 @@ velox_link_libraries( velox_add_library( velox_common_base + AdmissionController.cpp BitUtil.cpp Counters.cpp Fs.cpp diff --git a/velox/common/base/tests/AdmissionControllerTest.cpp b/velox/common/base/tests/AdmissionControllerTest.cpp new file mode 100644 index 000000000000..9c1a786d7bd0 --- /dev/null +++ b/velox/common/base/tests/AdmissionControllerTest.cpp @@ -0,0 +1,82 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/AdmissionController.h" +#include +#include +#include "velox/common/base/VeloxException.h" +#include "velox/common/base/tests/GTestUtils.h" + +using namespace facebook::velox; +namespace facebook::velox::common { +TEST(AdmissionController, basic) { + const uint64_t kLimit = 100000; + AdmissionController::Config config; + config.maxLimit = kLimit; + AdmissionController admissionController(config); + EXPECT_EQ(admissionController.currentResourceUsage(), 0); + + admissionController.accept(100); + EXPECT_EQ(admissionController.currentResourceUsage(), 100); + + admissionController.accept(100); + EXPECT_EQ(admissionController.currentResourceUsage(), 200); + + admissionController.release(100); + EXPECT_EQ(admissionController.currentResourceUsage(), 100); + + VELOX_ASSERT_THROW( + admissionController.release(101), + "Cannot release more units than have been acquired"); + + VELOX_ASSERT_THROW( + admissionController.accept(kLimit + 1), + "A single request cannot exceed the max limit"); +} + +TEST(AdmissionController, multiThreaded) { + // Ensure that resource usage never exceeds the limit set in the admission + // controller. + const uint64_t kLimit = 30; + std::atomic_uint64_t currentUsage{0}; + AdmissionController::Config config; + config.maxLimit = kLimit; + AdmissionController admissionController(config); + + std::vector threads; + for (int i = 0; i < 20; i++) { + threads.push_back(std::thread([&]() { + for (int j = 0; j < 10'000; j++) { + uint64_t usageUnits = std::rand() % (kLimit + 2); + if (usageUnits > kLimit) { + VELOX_ASSERT_THROW( + admissionController.accept(usageUnits), + "A single request cannot exceed the max limit") + continue; + } + admissionController.accept(usageUnits); + uint64_t curr = currentUsage.fetch_add(usageUnits); + ASSERT_LE(curr + usageUnits, kLimit); + currentUsage.fetch_sub(usageUnits); + admissionController.release(usageUnits); + } + })); + } + for (auto& thread : threads) { + thread.join(); + } +} +} // namespace facebook::velox::common diff --git a/velox/common/base/tests/CMakeLists.txt b/velox/common/base/tests/CMakeLists.txt index 51fa95a14f95..9a10ec270b86 100644 --- a/velox/common/base/tests/CMakeLists.txt +++ b/velox/common/base/tests/CMakeLists.txt @@ -14,6 +14,7 @@ add_executable( velox_base_test + AdmissionControllerTest.cpp AsyncSourceTest.cpp BitUtilTest.cpp BloomFilterTest.cpp