diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index ea88eca40d83..368e409e23d6 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -35,11 +35,9 @@ import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.SystemPropertyKeys; -import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Message; -import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.broker.broker.AccessControlFactory; @@ -50,7 +48,7 @@ import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate; import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler; import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler; -import org.apache.pinot.broker.requesthandler.MultiStageQuerySemaphore; +import org.apache.pinot.broker.requesthandler.MultiStageQueryThrottler; import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler; import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler; import org.apache.pinot.broker.routing.BrokerRoutingManager; @@ -140,7 +138,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { // Handles the server routing stats. protected ServerRoutingStatsManager _serverRoutingStatsManager; protected HelixExternalViewBasedQueryQuotaManager _queryQuotaManager; - protected MultiStageQuerySemaphore _multiStageQuerySemaphore; + protected MultiStageQueryThrottler _multiStageQueryThrottler; @Override public void init(PinotConfiguration brokerConf) @@ -339,26 +337,15 @@ public void start() MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null; QueryDispatcher queryDispatcher = null; if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) { - int numBrokers = Math.max(1, (int) _helixAdmin - .getInstancesInCluster(_spectatorHelixManager.getClusterName()) - .stream() - .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) - .count()); - int maxConcurrentQueries = Integer.parseInt( - _helixAdmin.getConfig(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( - _spectatorHelixManager.getClusterName()).build(), - Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)) - .getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, - Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES)); - _multiStageQuerySemaphore = new MultiStageQuerySemaphore(numBrokers, maxConcurrentQueries); - _multiStageQuerySemaphore.init(_spectatorHelixManager); + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_spectatorHelixManager); // multi-stage request handler uses both Netty and GRPC ports. // worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport. // TODO: decouple protocol and engine selection. queryDispatcher = createQueryDispatcher(_brokerConf); multiStageBrokerRequestHandler = new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, - _queryQuotaManager, tableCache, _multiStageQuerySemaphore); + _queryQuotaManager, tableCache, _multiStageQueryThrottler); } TimeSeriesRequestHandler timeSeriesRequestHandler = null; if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) { @@ -397,7 +384,7 @@ public void start() clusterConfigChangeHandler.init(_spectatorHelixManager); } _clusterConfigChangeHandlers.add(_queryQuotaManager); - _clusterConfigChangeHandlers.add(_multiStageQuerySemaphore); + _clusterConfigChangeHandlers.add(_multiStageQueryThrottler); for (ClusterChangeHandler idealStateChangeHandler : _idealStateChangeHandlers) { idealStateChangeHandler.init(_spectatorHelixManager); } @@ -407,7 +394,7 @@ public void start() } _externalViewChangeHandlers.add(_routingManager); _externalViewChangeHandlers.add(_queryQuotaManager); - _externalViewChangeHandlers.add(_multiStageQuerySemaphore); + _externalViewChangeHandlers.add(_multiStageQueryThrottler); for (ClusterChangeHandler instanceConfigChangeHandler : _instanceConfigChangeHandlers) { instanceConfigChangeHandler.init(_spectatorHelixManager); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 7940b69d960b..6ad98dea3488 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -89,11 +89,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { private final WorkerManager _workerManager; private final QueryDispatcher _queryDispatcher; private final boolean _explainAskingServerDefault; - private final MultiStageQuerySemaphore _querySemaphore; + private final MultiStageQueryThrottler _queryThrottler; public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, - MultiStageQuerySemaphore querySemaphore) { + MultiStageQueryThrottler queryThrottler) { super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache); String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME); int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT)); @@ -109,7 +109,7 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId _explainAskingServerDefault = _config.getProperty( CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN, CommonConstants.MultiStageQueryRunner.DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN); - _querySemaphore = querySemaphore; + _queryThrottler = queryThrottler; } @Override @@ -231,7 +231,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO try { // It's fine to block in this thread because we use a separate thread pool from the main Jersey server to process // these requests. - if (!_querySemaphore.tryAcquire(queryTimeoutMs, TimeUnit.MILLISECONDS)) { + if (!_queryThrottler.tryAcquire(queryTimeoutMs, TimeUnit.MILLISECONDS)) { LOGGER.warn("Timed out waiting to execute request {}: {}", requestId, query); requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE); return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR); @@ -308,7 +308,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO return brokerResponse; } finally { - _querySemaphore.release(); + _queryThrottler.release(); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQuerySemaphore.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java similarity index 50% rename from pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQuerySemaphore.java rename to pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java index d845d419d736..e5a69eee6bb3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQuerySemaphore.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java @@ -18,9 +18,10 @@ */ package org.apache.pinot.broker.requesthandler; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Collections; -import java.util.concurrent.Semaphore; +import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixConstants; @@ -28,6 +29,7 @@ import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.pinot.broker.broker.helix.ClusterChangeHandler; +import org.apache.pinot.common.concurrency.AdjustableSemaphore; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,84 +37,81 @@ /** * This class helps limit the number of multi-stage queries being executed concurrently. Currently, this limit is - * applied at the broker level, but could be moved to the server level in the future. + * applied at the broker level, but could be moved to the server level in the future. Note that the cluster + * configuration is a "per server" value and the broker currently simply assumes that a query will be across all + * servers. Another assumption here is that queries are evenly distributed across brokers. Ideally, we want to move to a + * model where the broker asks each server whether it can execute a query stage before dispatching the query stage to + * the server. This would allow for more fine-grained control over the number of queries being executed concurrently + * (but there are some limitations around ordering and blocking that need to be solved first). */ -public class MultiStageQuerySemaphore extends Semaphore implements ClusterChangeHandler { +public class MultiStageQueryThrottler implements ClusterChangeHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageQuerySemaphore.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageQueryThrottler.class); private HelixManager _helixManager; private HelixAdmin _helixAdmin; private HelixConfigScope _helixConfigScope; private int _numBrokers; + private int _numServers; + // If _maxConcurrentQueries is <= 0, it means that the cluster is not configured to limit the number of multi-stage + // queries that can be executed concurrently. In this case, we should not block the query. private int _maxConcurrentQueries; - private int _totalPermits; - - public MultiStageQuerySemaphore(int numBrokers, int maxConcurrentQueries) { - super(Math.max(1, maxConcurrentQueries / Math.max(numBrokers, 1)), true); - _maxConcurrentQueries = maxConcurrentQueries; - _numBrokers = Math.max(1, numBrokers); - _totalPermits = - maxConcurrentQueries > 0 ? Math.max(1, maxConcurrentQueries / Math.max(numBrokers, 1)) : maxConcurrentQueries; - } - - @Override - public void acquire() - throws InterruptedException { - // If _totalPermits is <= 0, it means that the cluster is not configured to limit the number of multi-stage queries - // that can be executed concurrently. In this case, we should not block the query. - if (_totalPermits > 0) { - super.acquire(); - } - } + private AdjustableSemaphore _semaphore; @Override - public void acquireUninterruptibly() { - // If _totalPermits is <= 0, it means that the cluster is not configured to limit the number of multi-stage queries - // that can be executed concurrently. In this case, we should not block the query. - if (_totalPermits > 0) { - super.acquireUninterruptibly(); - } - } + public void init(HelixManager helixManager) { + _helixManager = helixManager; + _helixAdmin = _helixManager.getClusterManagmentTool(); + _helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( + _helixManager.getClusterName()).build(); - @Override - public boolean tryAcquire() { - // If _totalPermits is <= 0, it means that the cluster is not configured to limit the number of multi-stage queries - // that can be executed concurrently. In this case, we should not block the query. - if (_totalPermits > 0) { - return super.tryAcquire(); - } else { - return true; + _maxConcurrentQueries = Integer.parseInt( + _helixAdmin.getConfig(_helixConfigScope, + Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)) + .getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, + CommonConstants.Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES)); + + List clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName()); + _numBrokers = Math.max(1, (int) clusterInstances.stream() + .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) + .count()); + _numServers = Math.max(1, (int) clusterInstances.stream() + .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) + .count()); + + if (_maxConcurrentQueries > 0) { + _semaphore = new AdjustableSemaphore(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers), true); } } - @Override + /** + * Returns true if the query can be executed (waiting until it can be executed if necessary), false otherwise. + *

+ * {@link #release()} should be called after the query is done executing. It is the responsibility of the caller to + * ensure that {@link #release()} is called exactly once for each call to this method. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @throws InterruptedException if the current thread is interrupted + */ public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { - // If _totalPermits is <= 0, it means that the cluster is not configured to limit the number of multi-stage queries - // that can be executed concurrently. In this case, we should not block the query. - if (_totalPermits > 0) { - return super.tryAcquire(timeout, unit); - } else { + if (_maxConcurrentQueries <= 0) { return true; } + return _semaphore.tryAcquire(timeout, unit); } - @Override + /** + * Should be called after the query is done executing. It is the responsibility of the caller to ensure that this + * method is called exactly once for each call to {@link #tryAcquire(long, TimeUnit)}. + */ public void release() { - if (_totalPermits > 0) { - super.release(); + if (_maxConcurrentQueries > 0) { + _semaphore.release(); } } - @Override - public void init(HelixManager helixManager) { - _helixManager = helixManager; - _helixAdmin = _helixManager.getClusterManagmentTool(); - _helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( - _helixManager.getClusterName()).build(); - } - @Override public void processClusterChange(HelixConstants.ChangeType changeType) { Preconditions.checkArgument( @@ -120,23 +119,19 @@ public void processClusterChange(HelixConstants.ChangeType changeType) { "MultiStageQuerySemaphore can only handle EXTERNAL_VIEW and CLUSTER_CONFIG changes"); if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) { - int numBrokers = - Math.max(1, (int) _helixAdmin - .getInstancesInCluster(_helixManager.getClusterName()) - .stream() - .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) - .count()); - - if (numBrokers != _numBrokers) { + List clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName()); + int numBrokers = Math.max(1, (int) clusterInstances.stream() + .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) + .count()); + int numServers = Math.max(1, (int) clusterInstances.stream() + .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) + .count()); + + if (numBrokers != _numBrokers || numServers != _numServers) { _numBrokers = numBrokers; + _numServers = numServers; if (_maxConcurrentQueries > 0) { - int newTotalPermits = Math.max(1, _maxConcurrentQueries / _numBrokers); - if (newTotalPermits > _totalPermits) { - release(newTotalPermits - _totalPermits); - } else if (newTotalPermits < _totalPermits) { - reducePermits(_totalPermits - newTotalPermits); - } - _totalPermits = newTotalPermits; + _semaphore.setPermits(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers)); } } } else { @@ -160,15 +155,14 @@ public void processClusterChange(HelixConstants.ChangeType changeType) { } if (maxConcurrentQueries > 0) { - int newTotalPermits = Math.max(1, maxConcurrentQueries / _numBrokers); - if (newTotalPermits > _totalPermits) { - release(newTotalPermits - _totalPermits); - } else if (newTotalPermits < _totalPermits) { - reducePermits(_totalPermits - newTotalPermits); - } - _totalPermits = newTotalPermits; + _semaphore.setPermits(Math.max(1, maxConcurrentQueries * _numServers / _numBrokers)); } _maxConcurrentQueries = maxConcurrentQueries; } } + + @VisibleForTesting + int availablePermits() { + return _semaphore.availablePermits(); + } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQuerySemaphoreTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQuerySemaphoreTest.java deleted file mode 100644 index 7ce063b91208..000000000000 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQuerySemaphoreTest.java +++ /dev/null @@ -1,273 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.broker.requesthandler; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixConstants; -import org.apache.helix.HelixManager; -import org.apache.pinot.spi.utils.CommonConstants; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.when; - - -public class MultiStageQuerySemaphoreTest { - - private AutoCloseable _mocks; - @Mock - private HelixManager _helixManager; - @Mock - private HelixAdmin _helixAdmin; - - @BeforeMethod - public void setUp() { - _mocks = MockitoAnnotations.openMocks(this); - when(_helixManager.getClusterManagmentTool()).thenReturn(_helixAdmin); - when(_helixManager.getClusterName()).thenReturn("testCluster"); - } - - @AfterMethod - public void tearDown() - throws Exception { - _mocks.close(); - } - - @Test - public void testBasicAcquireRelease() - throws Exception { - MultiStageQuerySemaphore semaphore = new MultiStageQuerySemaphore(2, 10); - semaphore.tryAcquire(100, TimeUnit.MILLISECONDS); - Assert.assertEquals(semaphore.availablePermits(), 4); - semaphore.release(); - Assert.assertEquals(semaphore.availablePermits(), 5); - } - - @Test - public void testAcquireTimeout() - throws Exception { - MultiStageQuerySemaphore semaphore = new MultiStageQuerySemaphore(2, 4); - semaphore.acquire(); - Assert.assertEquals(semaphore.availablePermits(), 1); - semaphore.acquire(); - Assert.assertEquals(semaphore.availablePermits(), 0); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - } - - @Test - public void testDisabledSemaphore() - throws Exception { - MultiStageQuerySemaphore semaphore = new MultiStageQuerySemaphore(1, -1); - - // If maxConcurrentQueries is <= 0, semaphore should be "disabled" and any attempt to acquire should succeed - for (int i = 0; i < 100; i++) { - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - } - } - - @Test - public void testIncreaseNumBrokers() - throws Exception { - MultiStageQuerySemaphore semaphore = new MultiStageQuerySemaphore(2, 4); - semaphore.init(_helixManager); - - semaphore.acquire(); - Assert.assertEquals(semaphore.availablePermits(), 1); - semaphore.acquire(); - Assert.assertEquals(semaphore.availablePermits(), 0); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - - // Increase the number of brokers - when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn( - List.of("Broker_0", "Broker_1", "Broker_2", "Broker_3")); - semaphore.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW); - - Assert.assertEquals(semaphore.availablePermits(), -1); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - semaphore.release(); - Assert.assertEquals(semaphore.availablePermits(), 0); - semaphore.release(); - Assert.assertEquals(semaphore.availablePermits(), 1); - } - - @Test - public void testDecreaseNumBrokers() - throws Exception { - MultiStageQuerySemaphore semaphore = new MultiStageQuerySemaphore(2, 4); - semaphore.init(_helixManager); - - semaphore.acquire(); - Assert.assertEquals(semaphore.availablePermits(), 1); - semaphore.acquire(); - Assert.assertEquals(semaphore.availablePermits(), 0); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - - // Decrease the number of brokers - when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(List.of("Broker_0")); - semaphore.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW); - - Assert.assertEquals(semaphore.availablePermits(), 2); - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - Assert.assertEquals(semaphore.availablePermits(), 1); - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - Assert.assertEquals(semaphore.availablePermits(), 0); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - - semaphore.release(); - semaphore.release(); - semaphore.release(); - semaphore.release(); - Assert.assertEquals(semaphore.availablePermits(), 4); - } - - @Test - public void testIncreaseMaxConcurrentQueries() - throws Exception { - MultiStageQuerySemaphore semaphore = new MultiStageQuerySemaphore(2, 4); - semaphore.init(_helixManager); - - semaphore.acquire(); - Assert.assertEquals(semaphore.availablePermits(), 1); - semaphore.acquire(); - Assert.assertEquals(semaphore.availablePermits(), 0); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - - // Increase the value of cluster config maxConcurrentQueries - when(_helixAdmin.getConfig(any(), - eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))) - .thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "10")); - semaphore.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG); - - Assert.assertEquals(semaphore.availablePermits(), 3); - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - Assert.assertEquals(semaphore.availablePermits(), 2); - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - Assert.assertEquals(semaphore.availablePermits(), 1); - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - Assert.assertEquals(semaphore.availablePermits(), 0); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - - semaphore.release(); - semaphore.release(); - semaphore.release(); - semaphore.release(); - semaphore.release(); - Assert.assertEquals(semaphore.availablePermits(), 5); - } - - @Test - public void testDecreaseMaxConcurrentQueries() - throws Exception { - MultiStageQuerySemaphore semaphore = new MultiStageQuerySemaphore(2, 4); - semaphore.init(_helixManager); - - semaphore.acquire(); - Assert.assertEquals(semaphore.availablePermits(), 1); - semaphore.acquire(); - Assert.assertEquals(semaphore.availablePermits(), 0); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - - // Decrease the value of cluster config maxConcurrentQueries - when(_helixAdmin.getConfig(any(), - eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))) - ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "2")); - semaphore.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG); - - Assert.assertEquals(semaphore.availablePermits(), -1); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - semaphore.release(); - Assert.assertEquals(semaphore.availablePermits(), 0); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - semaphore.release(); - Assert.assertEquals(semaphore.availablePermits(), 1); - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - } - - @Test - public void testEnabledToDisabledTransitionDisallowed() - throws Exception { - MultiStageQuerySemaphore semaphore = new MultiStageQuerySemaphore(2, 4); - semaphore.init(_helixManager); - - Assert.assertEquals(semaphore.availablePermits(), 2); - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - Assert.assertEquals(semaphore.availablePermits(), 1); - - // Disable the semaphore via cluster config change - when(_helixAdmin.getConfig(any(), - eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))) - ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1")); - semaphore.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG); - - // Should not be allowed to disable the semaphore if it is enabled during startup - Assert.assertEquals(semaphore.availablePermits(), 1); - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - Assert.assertEquals(semaphore.availablePermits(), 0); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - } - - @Test - public void testDisabledToEnabledTransitionDisallowed() - throws Exception { - MultiStageQuerySemaphore semaphore = new MultiStageQuerySemaphore(1, -1); - semaphore.init(_helixManager); - - // If maxConcurrentQueries is <= 0, semaphore should be "disabled" and any attempt to acquire should succeed - for (int i = 0; i < 100; i++) { - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - } - - // Enable the semaphore via cluster config change - when(_helixAdmin.getConfig(any(), - eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))) - ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "4")); - semaphore.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG); - - // Should not be allowed to enable the semaphore if it is disabled during startup - for (int i = 0; i < 100; i++) { - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - } - for (int i = 0; i < 100; i++) { - semaphore.acquire(); - } - } - - @Test - public void testMaxConcurrentQueriesSmallerThanNumBrokers() - throws Exception { - MultiStageQuerySemaphore semaphore = new MultiStageQuerySemaphore(4, 2); - semaphore.init(_helixManager); - - // The total permits should be capped at 1 even though maxConcurrentQueries / numBrokers is 0. - Assert.assertEquals(semaphore.availablePermits(), 1); - Assert.assertTrue(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - Assert.assertEquals(semaphore.availablePermits(), 0); - Assert.assertFalse(semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)); - } -} diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java new file mode 100644 index 000000000000..fe2a5a124006 --- /dev/null +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixManager; +import org.apache.pinot.spi.utils.CommonConstants; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + + +public class MultiStageQueryThrottlerTest { + + private AutoCloseable _mocks; + @Mock + private HelixManager _helixManager; + @Mock + private HelixAdmin _helixAdmin; + private MultiStageQueryThrottler _multiStageQueryThrottler; + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + when(_helixManager.getClusterManagmentTool()).thenReturn(_helixAdmin); + when(_helixManager.getClusterName()).thenReturn("testCluster"); + when(_helixAdmin.getConfig(any(), any())).thenReturn( + Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "4")); + when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn( + List.of("Broker_0", "Broker_1", "Server_0", "Server_1")); + } + + @AfterMethod + public void tearDown() + throws Exception { + _mocks.close(); + } + + @Test + public void testBasicAcquireRelease() + throws Exception { + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3); + _multiStageQueryThrottler.release(); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4); + } + + @Test + public void testAcquireTimeout() + throws Exception { + when(_helixAdmin.getConfig(any(), + eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))).thenReturn( + Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "2")); + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1); + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0); + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + + @Test + public void testDisabledThrottling() + throws Exception { + when(_helixAdmin.getConfig(any(), any())).thenReturn( + Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1")); + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + // If maxConcurrentQueries is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should + // succeed + for (int i = 0; i < 100; i++) { + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + } + + @Test + public void testIncreaseNumBrokers() + throws Exception { + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + for (int i = 0; i < 4; i++) { + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0); + + // Increase the number of brokers + when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn( + List.of("Broker_0", "Broker_1", "Broker_2", "Broker_3", "Server_0", "Server_1")); + _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW); + + // Verify that the number of permits on this broker have been reduced to account for the new brokers + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -2); + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + + for (int i = 0; i < 4; i++) { + _multiStageQueryThrottler.release(); + } + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2); + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + + @Test + public void testDecreaseNumBrokers() + throws Exception { + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + for (int i = 0; i < 4; i++) { + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0); + + // Decrease the number of brokers + when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(List.of("Broker_0", "Server_0", "Server_1")); + _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW); + + // Ensure that the permits from the removed broker are added to this one. + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4); + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3); + } + + @Test + public void testIncreaseNumServers() + throws Exception { + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + for (int i = 0; i < 4; i++) { + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0); + + // Increase the number of servers + when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn( + List.of("Broker_0", "Broker_1", "Server_0", "Server_1", "Server_2")); + _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW); + + // Ensure that the permits on this broker are increased to account for the new server + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2); + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1); + } + + @Test + public void testDecreaseNumServers() + throws Exception { + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + for (int i = 0; i < 4; i++) { + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0); + + // Decrease the number of servers + when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(List.of("Broker_0", "Broker_1", "Server_0")); + _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW); + + // Verify that the number of permits on this broker have been reduced to account for the removed server + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -2); + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + + for (int i = 0; i < 4; i++) { + _multiStageQueryThrottler.release(); + } + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2); + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + + @Test + public void testIncreaseMaxConcurrentQueries() + throws Exception { + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + for (int i = 0; i < 4; i++) { + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0); + + // Increase the value of cluster config maxConcurrentQueries + when(_helixAdmin.getConfig(any(), + eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))) + .thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "8")); + _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG); + + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4); + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + + @Test + public void testDecreaseMaxConcurrentQueries() + throws Exception { + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + for (int i = 0; i < 4; i++) { + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0); + + // Decrease the value of cluster config maxConcurrentQueries + when(_helixAdmin.getConfig(any(), + eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))) + ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "3")); + _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG); + + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -1); + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + + for (int i = 0; i < 4; i++) { + _multiStageQueryThrottler.release(); + } + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3); + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + + @Test + public void testEnabledToDisabledTransitionDisallowed() + throws Exception { + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4); + + // Disable the throttling mechanism via cluster config change + when(_helixAdmin.getConfig(any(), + eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))) + ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1")); + _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG); + + // Should not be allowed to disable the throttling mechanism if it is enabled during startup + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4); + + for (int i = 0; i < 4; i++) { + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0); + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + + @Test + public void testDisabledToEnabledTransitionDisallowed() + throws Exception { + when(_helixAdmin.getConfig(any(), + eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))) + ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1")); + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + // If maxConcurrentQueries is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should + // succeed + for (int i = 0; i < 100; i++) { + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + + // Enable the throttling mechanism via cluster config change + when(_helixAdmin.getConfig(any(), + eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))) + ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "4")); + _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG); + + // Should not be allowed to enable the throttling mechanism if it is disabled during startup + for (int i = 0; i < 100; i++) { + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } + } + + @Test + public void testMaxConcurrentQueriesSmallerThanNumBrokers() + throws Exception { + when(_helixAdmin.getConfig(any(), + eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))) + ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "2")); + when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn( + List.of("Broker_0", "Broker_1", "Broker_2", "Broker_3", "Server_0", "Server_1")); + _multiStageQueryThrottler = new MultiStageQueryThrottler(); + _multiStageQueryThrottler.init(_helixManager); + + // The total permits should be capped at 1 even though maxConcurrentQueries * numServers / numBrokers is 0. + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1); + Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0); + Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS)); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java b/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java new file mode 100644 index 000000000000..08751c730302 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.concurrency; + +import java.util.concurrent.Semaphore; + + +/** + * A semaphore that allows adjusting the number of permits in a non-blocking way. + */ +public class AdjustableSemaphore extends Semaphore { + + private int _totalPermits; + + public AdjustableSemaphore(int permits) { + super(permits); + _totalPermits = permits; + } + + public AdjustableSemaphore(int permits, boolean fair) { + super(permits, fair); + _totalPermits = permits; + } + + public void setPermits(int permits) { + if (permits < _totalPermits) { + reducePermits(_totalPermits - permits); + } else if (permits > _totalPermits) { + release(permits - _totalPermits); + } + _totalPermits = permits; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 40bd86cc0425..67bd6191c38c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -237,8 +237,9 @@ public static class Instance { public static final String CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED = "pinot.multistage.engine.tls.enabled"; public static final boolean DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED = false; + // This is a "beta" config and can be changed or even removed in future releases. public static final String CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES = - "pinot.multistage.engine.max.concurrent.queries"; + "pinot.beta.multistage.engine.max.server.concurrent.queries"; public static final String DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES = "-1"; }