diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java index 926de0699d76..427065b76eec 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java @@ -153,7 +153,7 @@ public void testNumInFlightReqSelector() { } for (int ii = 0; ii < 10; ii++) { for (String server : _servers) { - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, server); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, server); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); } } @@ -187,7 +187,7 @@ public void testNumInFlightReqSelector() { for (int ii = 0; ii < _servers.size(); ii++) { for (int jj = 0; jj < ii; jj++) { - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(ii)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(ii)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); } } @@ -232,15 +232,15 @@ public void testNumInFlightReqSelector() { numInflightReqMap.put("server2", 11); numInflightReqMap.put("server3", 15); numInflightReqMap.put("server4", 13); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(0)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(0)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(0)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(0)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(2)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(2)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(2)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(2)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(2)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(2)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); serverRankingWithVal = selector.fetchAllServerRankingsWithScores(); @@ -290,7 +290,7 @@ public void testNumInFlightReqSelector() { // Route the request to the best server. selectedServer = serverRankingWithVal.get(0).getLeft(); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, selectedServer); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, selectedServer); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); int numReq = numInflightReqMap.get(selectedServer) + 1; numInflightReqMap.put(selectedServer, numReq); @@ -484,7 +484,7 @@ public void testHybridSelector() { // TEST 2: Populate all servers with equal numInFlightRequests and latencies. for (int ii = 0; ii < 10; ii++) { for (String server : _servers) { - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, server); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, server); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); } } @@ -571,7 +571,7 @@ public void testHybridSelector() { // Route the request to the best server. selectedServer = serverRankingWithVal.get(0).getLeft(); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, selectedServer); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, selectedServer); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); if (rand.nextBoolean()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java index f03509fb541a..7bcc90b50da4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java @@ -56,13 +56,17 @@ public AsyncQueryResponse(QueryRouter queryRouter, long requestId, Set(HashUtil.getHashMapCapacity(numServersQueried)); + _serverRoutingStatsManager = serverRoutingStatsManager; for (ServerRoutingInstance serverRoutingInstance : serversQueried) { + // Record stats related to query submission just before sending the request. Otherwise, if the response is + // received immediately, there's a possibility of updating query response stats before updating query + // submission stats. + _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId, serverRoutingInstance.getInstanceId()); _responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs)); } _countDownLatch = new CountDownLatch(numServersQueried); _timeoutMs = timeoutMs; _maxEndTimeMs = startTimeMs + timeoutMs; - _serverRoutingStatsManager = serverRoutingStatsManager; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index 086211ad5f06..e5b96840e78e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -126,10 +126,6 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, ServerRoutingInstance serverRoutingInstance = entry.getKey(); ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels; try { - // Record stats related to query submission just before sending the request. Otherwise, if the response is - // received immediately, there's a possibility of updating query response stats before updating query - // submission stats. - _serverRoutingStatsManager.recordStatsAfterQuerySubmission(requestId, serverRoutingInstance.getInstanceId()); serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(), timeoutMs); asyncQueryResponse.markRequestSubmitted(serverRoutingInstance); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java index 1057fc084f1d..a21906d23b82 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java @@ -135,9 +135,9 @@ public long getCompletedTaskCount() { } /** - * Called when a query is submitted to a server. Updates stats corresponding to query submission. + * Called just before submitting a query to a server. Updates stats corresponding to query submission. */ - public void recordStatsAfterQuerySubmission(long requestId, String serverInstanceId) { + public void recordStatsForQuerySubmission(long requestId, String serverInstanceId) { if (!_isEnabled) { return; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java index 84983f87ec82..1ef3022a0997 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java @@ -131,7 +131,7 @@ public void testQuerySubmitAndCompletionStats() { int requestId = 0; // Submit stats for server1. - manager.recordStatsAfterQuerySubmission(requestId++, "server1"); + manager.recordStatsForQuerySubmission(requestId++, "server1"); waitForStatsUpdate(manager, requestId); List> numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers(); @@ -156,7 +156,7 @@ public void testQuerySubmitAndCompletionStats() { assertEquals(score, 0.0); // Submit more stats for server 1. - manager.recordStatsAfterQuerySubmission(requestId++, "server1"); + manager.recordStatsForQuerySubmission(requestId++, "server1"); waitForStatsUpdate(manager, requestId); numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers(); @@ -181,7 +181,7 @@ public void testQuerySubmitAndCompletionStats() { assertEquals(score, 0.0); // Add a new server server2. - manager.recordStatsAfterQuerySubmission(requestId++, "server2"); + manager.recordStatsForQuerySubmission(requestId++, "server2"); waitForStatsUpdate(manager, requestId);