Skip to content

Commit

Permalink
Reposition query submission spot for adaptive server selection (apach…
Browse files Browse the repository at this point in the history
…e#13327)

* Refactor ADSS querysubmission stats to avoid missing servers

* Address review comments.
  • Loading branch information
vvivekiyer authored Jun 8, 2024
1 parent 70412e6 commit 61aa6ce
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testNumInFlightReqSelector() {
}
for (int ii = 0; ii < 10; ii++) {
for (String server : _servers) {
serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, server);
serverRoutingStatsManager.recordStatsForQuerySubmission(-1, server);
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
}
}
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testNumInFlightReqSelector() {

for (int ii = 0; ii < _servers.size(); ii++) {
for (int jj = 0; jj < ii; jj++) {
serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(ii));
serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(ii));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
}
}
Expand Down Expand Up @@ -232,15 +232,15 @@ public void testNumInFlightReqSelector() {
numInflightReqMap.put("server2", 11);
numInflightReqMap.put("server3", 15);
numInflightReqMap.put("server4", 13);
serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(0));
serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(0));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(0));
serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(0));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(2));
serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(2));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(2));
serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(2));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(2));
serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(2));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);

serverRankingWithVal = selector.fetchAllServerRankingsWithScores();
Expand Down Expand Up @@ -290,7 +290,7 @@ public void testNumInFlightReqSelector() {

// Route the request to the best server.
selectedServer = serverRankingWithVal.get(0).getLeft();
serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, selectedServer);
serverRoutingStatsManager.recordStatsForQuerySubmission(-1, selectedServer);
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
int numReq = numInflightReqMap.get(selectedServer) + 1;
numInflightReqMap.put(selectedServer, numReq);
Expand Down Expand Up @@ -484,7 +484,7 @@ public void testHybridSelector() {
// TEST 2: Populate all servers with equal numInFlightRequests and latencies.
for (int ii = 0; ii < 10; ii++) {
for (String server : _servers) {
serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, server);
serverRoutingStatsManager.recordStatsForQuerySubmission(-1, server);
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
}
}
Expand Down Expand Up @@ -571,7 +571,7 @@ public void testHybridSelector() {

// Route the request to the best server.
selectedServer = serverRankingWithVal.get(0).getLeft();
serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, selectedServer);
serverRoutingStatsManager.recordStatsForQuerySubmission(-1, selectedServer);
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);

if (rand.nextBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ public AsyncQueryResponse(QueryRouter queryRouter, long requestId, Set<ServerRou
_requestId = requestId;
int numServersQueried = serversQueried.size();
_responseMap = new ConcurrentHashMap<>(HashUtil.getHashMapCapacity(numServersQueried));
_serverRoutingStatsManager = serverRoutingStatsManager;
for (ServerRoutingInstance serverRoutingInstance : serversQueried) {
// Record stats related to query submission just before sending the request. Otherwise, if the response is
// received immediately, there's a possibility of updating query response stats before updating query
// submission stats.
_serverRoutingStatsManager.recordStatsForQuerySubmission(requestId, serverRoutingInstance.getInstanceId());
_responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs));
}
_countDownLatch = new CountDownLatch(numServersQueried);
_timeoutMs = timeoutMs;
_maxEndTimeMs = startTimeMs + timeoutMs;
_serverRoutingStatsManager = serverRoutingStatsManager;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName,
ServerRoutingInstance serverRoutingInstance = entry.getKey();
ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels;
try {
// Record stats related to query submission just before sending the request. Otherwise, if the response is
// received immediately, there's a possibility of updating query response stats before updating query
// submission stats.
_serverRoutingStatsManager.recordStatsAfterQuerySubmission(requestId, serverRoutingInstance.getInstanceId());
serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(),
timeoutMs);
asyncQueryResponse.markRequestSubmitted(serverRoutingInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ public long getCompletedTaskCount() {
}

/**
* Called when a query is submitted to a server. Updates stats corresponding to query submission.
* Called just before submitting a query to a server. Updates stats corresponding to query submission.
*/
public void recordStatsAfterQuerySubmission(long requestId, String serverInstanceId) {
public void recordStatsForQuerySubmission(long requestId, String serverInstanceId) {
if (!_isEnabled) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testQuerySubmitAndCompletionStats() {
int requestId = 0;

// Submit stats for server1.
manager.recordStatsAfterQuerySubmission(requestId++, "server1");
manager.recordStatsForQuerySubmission(requestId++, "server1");
waitForStatsUpdate(manager, requestId);

List<Pair<String, Integer>> numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers();
Expand All @@ -156,7 +156,7 @@ public void testQuerySubmitAndCompletionStats() {
assertEquals(score, 0.0);

// Submit more stats for server 1.
manager.recordStatsAfterQuerySubmission(requestId++, "server1");
manager.recordStatsForQuerySubmission(requestId++, "server1");
waitForStatsUpdate(manager, requestId);

numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers();
Expand All @@ -181,7 +181,7 @@ public void testQuerySubmitAndCompletionStats() {
assertEquals(score, 0.0);

// Add a new server server2.
manager.recordStatsAfterQuerySubmission(requestId++, "server2");
manager.recordStatsForQuerySubmission(requestId++, "server2");
waitForStatsUpdate(manager, requestId);


Expand Down

0 comments on commit 61aa6ce

Please sign in to comment.