diff --git a/velox/dwio/common/CMakeLists.txt b/velox/dwio/common/CMakeLists.txt index 1fc725a124e6f..ef4c31ed865ba 100644 --- a/velox/dwio/common/CMakeLists.txt +++ b/velox/dwio/common/CMakeLists.txt @@ -39,6 +39,7 @@ add_library( ExecutorBarrier.cpp FileSink.cpp FlatMapHelper.cpp + GenericAdmissionController.cpp OnDemandUnitLoader.cpp InputStream.cpp IntDecoder.cpp diff --git a/velox/dwio/common/GenericAdmissionController.cpp b/velox/dwio/common/GenericAdmissionController.cpp new file mode 100644 index 0000000000000..173b9bbf654b1 --- /dev/null +++ b/velox/dwio/common/GenericAdmissionController.cpp @@ -0,0 +1,90 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/GenericAdmissionController.h" + +#include "velox/common/base/Exceptions.h" +#include "velox/common/base/StatsReporter.h" +#include "velox/common/time/Timer.h" + +namespace facebook::velox::dwio::common { + +void GenericAdmissionController::accept(uint64_t resourceUnits) { + ContinueFuture future; + uint64_t updatedValue = 0; + VELOX_CHECK_LE( + resourceUnits, + config_.maxLimit, + "A single request cannot exceed the max limit"); + { + std::lock_guard l(mtx); + if (unitsUsed_ + resourceUnits > config_.maxLimit) { + auto [unblockPromise, unblockFuture] = makeVeloxContinuePromiseContract(); + Request req{resourceUnits}; + req.promise = std::move(unblockPromise); + queue_.push_back(std::move(req)); + future = std::move(unblockFuture); + } else { + updatedValue = unitsUsed_ += resourceUnits; + } + } + if (future.valid()) { + if (!config_.resourceQueuedCountMetric.empty()) { + RECORD_METRIC_VALUE(config_.resourceQueuedCountMetric); + } + uint64_t waitTimeUs{0}; + { + MicrosecondTimer timer(&waitTimeUs); + future.wait(); + } + if (!config_.resourceQueuedTimeMsMetric.empty()) { + RECORD_HISTOGRAM_METRIC_VALUE( + config_.resourceQueuedTimeMsMetric, waitTimeUs / 1'000); + } + return; + } + // Only upadate if there was no wait, as the releasing thread is responsible + // for updating the metric. + if (!config_.resourceUsageMetric.empty()) { + RECORD_METRIC_VALUE(config_.resourceUsageMetric, updatedValue); + } +} + +void GenericAdmissionController::release(uint64_t resourceUnits) { + uint64_t updatedValue = 0; + { + std::lock_guard l(mtx); + VELOX_CHECK_LE( + resourceUnits, + unitsUsed_, + "Cannot release more units than have been acquired"); + unitsUsed_ -= resourceUnits; + while (!queue_.empty()) { + auto& request = queue_.front(); + if (unitsUsed_ + request.unitsRequested > config_.maxLimit) { + break; + } + unitsUsed_ += request.unitsRequested; + request.promise.setValue(); + queue_.pop_front(); + } + updatedValue = unitsUsed_; + } + if (!config_.resourceUsageMetric.empty()) { + RECORD_METRIC_VALUE(config_.resourceUsageMetric, updatedValue); + } +} +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/GenericAdmissionController.h b/velox/dwio/common/GenericAdmissionController.h new file mode 100644 index 0000000000000..e711832a035be --- /dev/null +++ b/velox/dwio/common/GenericAdmissionController.h @@ -0,0 +1,63 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include "velox/common/future/VeloxPromise.h" + +namespace facebook::velox::dwio::common { + +/// A generic admission controller that can be used to limit the number of +/// resources in use and can log metrics like resource usage, queued count, +/// queued wait times. When a calling thread's request for resources surpasses +/// the set limit, it will be placed in a FIFO queue. The thread must then wait +/// until sufficient resources are freed by other threads, addressing all +/// preceding requests in the queue, before its own request can be granted. +class GenericAdmissionController { + public: + struct Config { + uint64_t maxLimit; + /// The metric name for resource usage. If not set, it will not be reported. + std::string resourceUsageMetric; + /// The metric name for resource queued count. If not set, it will not be + /// reported + std::string resourceQueuedCountMetric; + /// The metric name for resource queued wait time. If not set, it will not + /// be reported + std::string resourceQueuedTimeMsMetric; + }; + explicit GenericAdmissionController(Config config) : config_(config) {} + + void accept(uint64_t resourceUnits); + void release(uint64_t resourceUnits); + + uint64_t currentResourceUsage() const { + std::lock_guard l(mtx); + return unitsUsed_; + } + + private: + struct Request { + uint64_t unitsRequested; + ContinuePromise promise; + }; + Config config_; + mutable std::mutex mtx; + uint64_t unitsUsed_{0}; + std::deque queue_; +}; +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/tests/CMakeLists.txt b/velox/dwio/common/tests/CMakeLists.txt index 19b7244430ca8..fbea0f716ffc9 100644 --- a/velox/dwio/common/tests/CMakeLists.txt +++ b/velox/dwio/common/tests/CMakeLists.txt @@ -23,6 +23,7 @@ add_executable( DataBufferTests.cpp DecoderUtilTest.cpp ExecutorBarrierTest.cpp + GenericAdmissionControllerTest.cpp OnDemandUnitLoaderTests.cpp LocalFileSinkTest.cpp MemorySinkTest.cpp diff --git a/velox/dwio/common/tests/GenericAdmissionControllerTest.cpp b/velox/dwio/common/tests/GenericAdmissionControllerTest.cpp new file mode 100644 index 0000000000000..4bc5aa73f2ef3 --- /dev/null +++ b/velox/dwio/common/tests/GenericAdmissionControllerTest.cpp @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/GenericAdmissionController.h" +#include +#include +#include "velox/common/base/VeloxException.h" +#include "velox/common/base/tests/GTestUtils.h" + +using namespace facebook::velox; +namespace facebook::velox::dwio::common { +TEST(GenericAdmissionController, basic) { + const uint64_t kLimit = 100000; + GenericAdmissionController::Config config{.maxLimit = kLimit}; + GenericAdmissionController admissionController(config); + EXPECT_EQ(admissionController.currentResourceUsage(), 0); + + admissionController.accept(100); + EXPECT_EQ(admissionController.currentResourceUsage(), 100); + + admissionController.accept(100); + EXPECT_EQ(admissionController.currentResourceUsage(), 200); + + admissionController.release(100); + EXPECT_EQ(admissionController.currentResourceUsage(), 100); + + VELOX_ASSERT_THROW( + admissionController.release(101), + "Cannot release more units than have been acquired"); + + VELOX_ASSERT_THROW( + admissionController.accept(kLimit + 1), + "A single request cannot exceed the max limit"); +} + +TEST(GenericAdmissionController, multiThreaded) { + // Ensure that resource usage never exceeds the limit set in the admission + // controller. + const uint64_t kLimit = 10; + std::atomic_uint64_t currentUsage{0}; + GenericAdmissionController::Config config{.maxLimit = 10}; + GenericAdmissionController admissionController(config); + + std::vector threads; + for (int i = 0; i < 20; i++) { + threads.push_back(std::thread([&]() { + for (int j = 0; j < 1000; j++) { + admissionController.accept(1); + uint64_t curr = currentUsage.fetch_add(1); + ASSERT_LE(curr + 1, kLimit); + currentUsage--; + admissionController.release(1); + } + })); + } + for (auto& thread : threads) { + thread.join(); + } +} +} // namespace facebook::velox::dwio::common