diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java index 6f4463d6b4c4c..e17e3432e77f5 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java @@ -60,7 +60,7 @@ public void testCreatePit() throws IOException { pitIds.add(pitResponse.getId()); DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds); DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync); - assertTrue(deletePitResponse.getDeletePitResults().get(0).isSucceeded()); + assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId())); } @@ -79,7 +79,7 @@ public void testDeleteAllPits() throws IOException { highLevelClient()::deleteAllPitsAsync ); for (DeletePitInfo deletePitInfo : deletePitResponse.getDeletePitResults()) { - assertTrue(deletePitInfo.isSucceeded()); + assertTrue(deletePitInfo.isSuccessful()); } } } diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java index e58f2dde380de..8b509e5d19e92 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java @@ -804,7 +804,7 @@ public void testSearchWithPit() throws Exception { highLevelClient()::deletePit, highLevelClient()::deletePitAsync ); - assertTrue(deletePitResponse.getDeletePitResults().get(0).isSucceeded()); + assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId())); } } diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitController.java b/server/src/main/java/org/opensearch/action/search/CreatePitController.java index 53410a87ecdba..ba71eaefa5c7a 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitController.java @@ -56,6 +56,7 @@ public class CreatePitController { private final Task task; private final ActionListener listener; private final CreatePitRequest request; + private final PitService pitService; private static final Logger logger = LogManager.getLogger(CreatePitController.class); public static final Setting PIT_INIT_KEEP_ALIVE = Setting.positiveTimeSetting( "pit.init.keep_alive", @@ -70,7 +71,8 @@ public CreatePitController( TransportSearchAction transportSearchAction, NamedWriteableRegistry namedWriteableRegistry, Task task, - ActionListener listener + ActionListener listener, + PitService pitService ) { this.searchTransportService = searchTransportService; this.clusterService = clusterService; @@ -79,6 +81,7 @@ public CreatePitController( this.task = task; this.listener = listener; this.request = request; + this.pitService = pitService; } /** @@ -263,14 +266,19 @@ private void cleanupContexts(Collection contexts, String @Override public void onResponse(DeletePitResponse response) { // this is invoke and forget call + final StringBuilder failedPitsStringBuilder = new StringBuilder(); + response.getDeletePitResults() + .stream() + .filter(r -> !r.isSuccessful()) + .forEach(r -> failedPitsStringBuilder.append(r.getPitId()).append(",")); + logger.warn(() -> new ParameterizedMessage("Failed to delete PIT IDs {}", failedPitsStringBuilder.toString())); if (!logger.isDebugEnabled()) return; - for (DeletePitInfo deletePitInfo : response.getDeletePitResults()) { - if (!deletePitInfo.isSucceeded()) { - logger.debug(() -> new ParameterizedMessage("Failed to delete PIT ID {}", deletePitInfo.getPitId())); - } else { - logger.debug(() -> new ParameterizedMessage("Deleted PIT with ID {}", deletePitInfo.getPitId())); - } - } + final StringBuilder successfulPitsStringBuilder = new StringBuilder(); + response.getDeletePitResults() + .stream() + .filter(r -> r.isSuccessful()) + .forEach(r -> successfulPitsStringBuilder.append(r.getPitId()).append(",")); + logger.debug(() -> new ParameterizedMessage("Deleted PIT with IDs {}", successfulPitsStringBuilder.toString())); } @Override @@ -284,6 +292,6 @@ public void onFailure(Exception e) { contextIdsForNode.add(new PitSearchContextIdForNode(pitId, context)); nodeToContextsMap.put(context.getNode(), contextIdsForNode); } - SearchUtils.deletePitContexts(nodeToContextsMap, deleteListener, clusterService.state(), searchTransportService); + pitService.deletePitContexts(nodeToContextsMap, deleteListener); } } diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java b/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java index d825a35c956fe..943199812771a 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java @@ -28,23 +28,23 @@ public class DeletePitInfo extends TransportResponse implements Writeable, ToXCo /** * This will be true if PIT reader contexts are deleted ond also if contexts are not found. */ - private final boolean succeeded; + private final boolean successful; private final String pitId; - public DeletePitInfo(boolean succeeded, String pitId) { - this.succeeded = succeeded; + public DeletePitInfo(boolean successful, String pitId) { + this.successful = successful; this.pitId = pitId; } public DeletePitInfo(StreamInput in) throws IOException { - succeeded = in.readBoolean(); + successful = in.readBoolean(); pitId = in.readString(); } - public boolean isSucceeded() { - return succeeded; + public boolean isSuccessful() { + return successful; } public String getPitId() { @@ -53,7 +53,7 @@ public String getPitId() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(succeeded); + out.writeBoolean(successful); out.writeString(pitId); } @@ -64,17 +64,17 @@ public void writeTo(StreamOutput out) throws IOException { ); static { - PARSER.declareBoolean(constructorArg(), new ParseField("succeeded")); + PARSER.declareBoolean(constructorArg(), new ParseField("successful")); PARSER.declareString(constructorArg(), new ParseField("pitId")); } - private static final ParseField SUCCEEDED = new ParseField("succeeded"); + private static final ParseField SUCCESSFUL = new ParseField("successful"); private static final ParseField PIT_ID = new ParseField("pitId"); @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(SUCCEEDED.getPreferredName(), succeeded); + builder.field(SUCCESSFUL.getPreferredName(), successful); builder.field(PIT_ID.getPreferredName(), pitId); builder.endObject(); return builder; diff --git a/server/src/main/java/org/opensearch/action/search/PitService.java b/server/src/main/java/org/opensearch/action/search/PitService.java new file mode 100644 index 0000000000000..6fe5a44aa12e1 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/PitService.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Strings; +import org.opensearch.common.inject.Inject; +import org.opensearch.transport.Transport; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** + * Service class for PIT reusable functions + */ +public class PitService { + + private static final Logger logger = LogManager.getLogger(PitService.class); + + private final ClusterService clusterService; + private final SearchTransportService searchTransportService; + + @Inject + public PitService(ClusterService clusterService, SearchTransportService searchTransportService) { + this.clusterService = clusterService; + this.searchTransportService = searchTransportService; + } + + /** + * Delete list of pit contexts. Returns the details of success of operation per PIT ID. + */ + public void deletePitContexts( + Map> nodeToContextsMap, + ActionListener listener + ) { + final Set clusters = nodeToContextsMap.values() + .stream() + .flatMap(Collection::stream) + .filter(ctx -> Strings.isEmpty(ctx.getSearchContextIdForNode().getClusterAlias()) == false) + .map(c -> c.getSearchContextIdForNode().getClusterAlias()) + .collect(Collectors.toSet()); + StepListener> lookupListener = SearchUtils.getConnectionLookupListener( + searchTransportService.getRemoteClusterService(), + clusterService.state(), + clusters + ); + lookupListener.whenComplete(nodeLookup -> { + final GroupedActionListener groupedListener = getDeletePitGroupedListener( + listener, + nodeToContextsMap.size() + ); + + for (Map.Entry> entry : nodeToContextsMap.entrySet()) { + String clusterAlias = entry.getValue().get(0).getSearchContextIdForNode().getClusterAlias(); + final DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode()); + if (node == null) { + logger.error( + () -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode()) + ); + List deletePitInfos = new ArrayList<>(); + for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) { + deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId())); + } + groupedListener.onResponse(new DeletePitResponse(deletePitInfos)); + } else { + try { + final Transport.Connection connection = searchTransportService.getConnection(clusterAlias, node); + searchTransportService.sendFreePITContexts(connection, entry.getValue(), groupedListener); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", node.getName()), e); + List deletePitInfos = new ArrayList<>(); + for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) { + deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId())); + } + groupedListener.onResponse(new DeletePitResponse(deletePitInfos)); + } + } + } + }, listener::onFailure); + } + + public GroupedActionListener getDeletePitGroupedListener(ActionListener listener, int size) { + return new GroupedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(final Collection responses) { + Map pitIdToSucceededMap = new HashMap<>(); + for (DeletePitResponse response : responses) { + for (DeletePitInfo deletePitInfo : response.getDeletePitResults()) { + if (!pitIdToSucceededMap.containsKey(deletePitInfo.getPitId())) { + pitIdToSucceededMap.put(deletePitInfo.getPitId(), deletePitInfo.isSuccessful()); + } + if (!deletePitInfo.isSuccessful()) { + logger.debug(() -> new ParameterizedMessage("Deleting PIT with ID {} failed ", deletePitInfo.getPitId())); + pitIdToSucceededMap.put(deletePitInfo.getPitId(), deletePitInfo.isSuccessful()); + } + } + } + List deletePitResults = new ArrayList<>(); + for (Map.Entry entry : pitIdToSucceededMap.entrySet()) { + deletePitResults.add(new DeletePitInfo(entry.getValue(), entry.getKey())); + } + DeletePitResponse deletePitResponse = new DeletePitResponse(deletePitResults); + listener.onResponse(deletePitResponse); + } + + @Override + public void onFailure(final Exception e) { + logger.error("Delete PITs failed", e); + listener.onFailure(e); + } + }, size); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchUtils.java b/server/src/main/java/org/opensearch/action/search/SearchUtils.java index 0d4667c0821d1..148d1645568b1 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchUtils.java +++ b/server/src/main/java/org/opensearch/action/search/SearchUtils.java @@ -8,32 +8,18 @@ package org.opensearch.action.search; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; -import org.opensearch.action.support.GroupedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.Strings; import org.opensearch.transport.RemoteClusterService; -import org.opensearch.transport.Transport; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.BiFunction; -import java.util.stream.Collectors; /** * Helper class for common search functions */ public class SearchUtils { - private static final Logger logger = LogManager.getLogger(SearchUtils.class); public SearchUtils() {} @@ -54,94 +40,4 @@ public static StepListener> getConnect } return lookupListener; } - - /** - * Delete list of pit contexts. Returns the details of success of operation per PIT ID. - */ - public static void deletePitContexts( - Map> nodeToContextsMap, - ActionListener listener, - ClusterState state, - SearchTransportService searchTransportService - ) { - final Set clusters = nodeToContextsMap.values() - .stream() - .flatMap(Collection::stream) - .filter(ctx -> Strings.isEmpty(ctx.getSearchContextIdForNode().getClusterAlias()) == false) - .map(c -> c.getSearchContextIdForNode().getClusterAlias()) - .collect(Collectors.toSet()); - StepListener> lookupListener = getConnectionLookupListener( - searchTransportService.getRemoteClusterService(), - state, - clusters - ); - lookupListener.whenComplete(nodeLookup -> { - final GroupedActionListener groupedListener = getDeletePitGroupedListener( - listener, - nodeToContextsMap.size() - ); - - for (Map.Entry> entry : nodeToContextsMap.entrySet()) { - String clusterAlias = entry.getValue().get(0).getSearchContextIdForNode().getClusterAlias(); - final DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode()); - if (node == null) { - logger.error( - () -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode()) - ); - List deletePitInfos = new ArrayList<>(); - for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) { - deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId())); - } - groupedListener.onResponse(new DeletePitResponse(deletePitInfos)); - } else { - try { - final Transport.Connection connection = searchTransportService.getConnection(clusterAlias, node); - searchTransportService.sendFreePITContexts(connection, entry.getValue(), groupedListener); - } catch (Exception e) { - logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", node.getName()), e); - List deletePitInfos = new ArrayList<>(); - for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) { - deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId())); - } - groupedListener.onResponse(new DeletePitResponse(deletePitInfos)); - } - } - } - }, listener::onFailure); - } - - public static GroupedActionListener getDeletePitGroupedListener( - ActionListener listener, - int size - ) { - return new GroupedActionListener<>(new ActionListener<>() { - @Override - public void onResponse(final Collection responses) { - Map pitIdToSucceededMap = new HashMap<>(); - for (DeletePitResponse response : responses) { - for (DeletePitInfo deletePitInfo : response.getDeletePitResults()) { - if (!pitIdToSucceededMap.containsKey(deletePitInfo.getPitId())) { - pitIdToSucceededMap.put(deletePitInfo.getPitId(), deletePitInfo.isSucceeded()); - } - if (!deletePitInfo.isSucceeded()) { - logger.debug(() -> new ParameterizedMessage("Deleting PIT with ID {} failed ", deletePitInfo.getPitId())); - pitIdToSucceededMap.put(deletePitInfo.getPitId(), deletePitInfo.isSucceeded()); - } - } - } - List deletePitResults = new ArrayList<>(); - for (Map.Entry entry : pitIdToSucceededMap.entrySet()) { - deletePitResults.add(new DeletePitInfo(entry.getValue(), entry.getKey())); - } - DeletePitResponse deletePitResponse = new DeletePitResponse(deletePitResults); - listener.onResponse(deletePitResponse); - } - - @Override - public void onFailure(final Exception e) { - logger.error("Delete PITs failed", e); - listener.onFailure(e); - } - }, size); - } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java index 3ec821dbed9c4..aff1f3c728845 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java @@ -40,6 +40,7 @@ public class TransportCreatePitAction extends HandledTransportAction new CreatePitRequest(in)); this.transportService = transportService; @@ -56,6 +58,7 @@ public TransportCreatePitAction( this.clusterService = clusterService; this.transportSearchAction = transportSearchAction; this.namedWriteableRegistry = namedWriteableRegistry; + this.pitService = pitService; } @Override @@ -67,7 +70,8 @@ protected void doExecute(Task task, CreatePitRequest request, ActionListener createPitListener = new StepListener<>(); final ActionListener updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> { diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index 89f666636ae74..d67979d1c87c5 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -32,6 +32,7 @@ public class TransportDeletePitAction extends HandledTransportAction listener, DeletePitReq nodeToContextsMap.put(contextIdForNode.getNode(), contexts); } } - SearchUtils.deletePitContexts(nodeToContextsMap, listener, clusterService.state(), searchTransportService); + pitService.deletePitContexts(nodeToContextsMap, listener); } /** @@ -85,7 +88,7 @@ private void deletePits(ActionListener listener, DeletePitReq private void deleteAllPits(ActionListener listener) { // TODO: Use list all PITs to delete all PITs in case of remote cluster use case int size = clusterService.state().getNodes().getSize(); - ActionListener groupedActionListener = SearchUtils.getDeletePitGroupedListener(listener, size); + ActionListener groupedActionListener = pitService.getDeletePitGroupedListener(listener, size); for (final DiscoveryNode node : clusterService.state().getNodes()) { try { Transport.Connection connection = searchTransportService.getConnection(null, node); diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index 50744f0a3499c..ef150b8555774 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -218,6 +218,8 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); + + PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( request, searchTransportService, @@ -225,7 +227,8 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportSearchAction, namedWriteableRegistry, task, - createPitListener + createPitListener, + pitService ); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @@ -308,7 +311,7 @@ public void sendFreePITContexts( CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - + PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( request, searchTransportService, @@ -316,7 +319,8 @@ public void sendFreePITContexts( transportSearchAction, namedWriteableRegistry, task, - createPitListener + createPitListener, + pitService ); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @@ -408,6 +412,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); + PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( request, searchTransportService, @@ -415,7 +420,8 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportSearchAction, namedWriteableRegistry, task, - createPitListener + createPitListener, + pitService ); CountDownLatch latch = new CountDownLatch(1); @@ -497,6 +503,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); + PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( request, searchTransportService, @@ -504,7 +511,8 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportSearchAction, namedWriteableRegistry, task, - createPitListener + createPitListener, + pitService ); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java index 003cd00e2219d..acddbf639b574 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java @@ -178,20 +178,22 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, namedWriteableRegistry, transportSearchAction, clusterServiceMock, - searchTransportService + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); PlainActionFuture future = newFuture(); action.execute(task, deletePITRequest, future); DeletePitResponse dr = future.get(); assertTrue(dr.getDeletePitResults().get(0).getPitId().equals("pitId")); - assertTrue(dr.getDeletePitResults().get(0).isSucceeded()); + assertTrue(dr.getDeletePitResults().get(0).isSuccessful()); assertEquals(3, deleteNodesInvoked.size()); } @@ -236,20 +238,22 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, namedWriteableRegistry, transportSearchAction, clusterServiceMock, - searchTransportService + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); PlainActionFuture future = newFuture(); action.execute(task, deletePITRequest, future); DeletePitResponse dr = future.get(); assertTrue(dr.getDeletePitResults().get(0).getPitId().equals("pitId")); - assertTrue(dr.getDeletePitResults().get(0).isSucceeded()); + assertTrue(dr.getDeletePitResults().get(0).isSuccessful()); assertEquals(3, deleteNodesInvoked.size()); } @@ -303,13 +307,15 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, namedWriteableRegistry, transportSearchAction, clusterServiceMock, - searchTransportService + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); PlainActionFuture future = newFuture(); @@ -360,13 +366,15 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, namedWriteableRegistry, transportSearchAction, clusterServiceMock, - searchTransportService + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); PlainActionFuture future = newFuture(); @@ -426,13 +434,15 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, namedWriteableRegistry, transportSearchAction, clusterServiceMock, - searchTransportService + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); PlainActionFuture future = newFuture(); @@ -486,13 +496,15 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, namedWriteableRegistry, transportSearchAction, clusterServiceMock, - searchTransportService + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); PlainActionFuture future = newFuture(); @@ -542,13 +554,15 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, namedWriteableRegistry, transportSearchAction, clusterServiceMock, - searchTransportService + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); PlainActionFuture future = newFuture(); @@ -602,13 +616,15 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; + PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, namedWriteableRegistry, transportSearchAction, clusterServiceMock, - searchTransportService + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); PlainActionFuture future = newFuture(); diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index a8b6238102786..5c3c43af9cb66 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -215,84 +215,6 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx service.doClose(); } - public void testSearchWithFirstPhaseKeepAliveExpiry() throws ExecutionException, InterruptedException { - createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); - client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - - CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueMillis(100), true); - request.setIndices(new String[] { "index" }); - ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); - CreatePitResponse pitResponse = execute.get(); - SearchService service = getInstanceFromNode(SearchService.class); - assertEquals(2, service.getActiveContexts()); - // since first phase temporary keep alive is set at 1 second in this test file - // and create pit request keep alive is less than that, keep alive is set to 1 second, (max of 2 keep alives) - // so reader context will clear up after 1 second - Thread.sleep(1200); - client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - - SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> { - client().prepareSearch() - .setSize(2) - .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) - .get(); - }); - assertTrue(ex.shardFailures()[0].reason().contains("SearchContextMissingException")); - assertEquals(0, service.getActiveContexts()); - service.doClose(); - } - - public void testSearchWithPitSecondPhaseKeepAliveExpiry() throws ExecutionException, InterruptedException { - createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); - client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueSeconds(2), true); - request.setIndices(new String[] { "index" }); - ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); - CreatePitResponse pitResponse = execute.get(); - SearchService service = getInstanceFromNode(SearchService.class); - assertEquals(2, service.getActiveContexts()); - Thread.sleep(1000); - assertEquals(2, service.getActiveContexts()); - Thread.sleep(1500); - assertEquals(0, service.getActiveContexts()); - client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> { - client().prepareSearch() - .setSize(2) - .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) - .get(); - }); - assertTrue(ex.shardFailures()[0].reason().contains("SearchContextMissingException")); - service.doClose(); - } - - public void testSearchWithPitKeepAliveExtension() throws ExecutionException, InterruptedException { - createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); - client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueSeconds(1), true); - request.setIndices(new String[] { "index" }); - ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); - CreatePitResponse pitResponse = execute.get(); - SearchService service = getInstanceFromNode(SearchService.class); - assertEquals(2, service.getActiveContexts()); - client().prepareSearch() - .setSize(2) - .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueSeconds(3))) - .get(); - client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - assertEquals(2, service.getActiveContexts()); - Thread.sleep(3500); - assertEquals(0, service.getActiveContexts()); - SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> { - client().prepareSearch("index") - .setSize(2) - .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueMinutes(1))) - .get(); - }); - assertTrue(ex.shardFailures()[0].reason().contains("SearchContextMissingException")); - service.doClose(); - } - public void testMaxOpenPitContexts() throws Exception { createIndex("index"); client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java index 96f82e221f090..61d7988c7aeb5 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -76,7 +76,7 @@ public void testDeletePit() throws Exception { assertEquals(2, deletePITResponse.getDeletePitResults().size()); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertTrue(pitIds.contains(deletePitInfo.getPitId())); - assertTrue(deletePitInfo.isSucceeded()); + assertTrue(deletePitInfo.isSuccessful()); } /** * Checking deleting the same PIT id again results in succeeded @@ -85,7 +85,7 @@ public void testDeletePit() throws Exception { deletePITResponse = deleteExecute.get(); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertTrue(pitIds.contains(deletePitInfo.getPitId())); - assertTrue(deletePitInfo.isSucceeded()); + assertTrue(deletePitInfo.isSuccessful()); } } @@ -105,7 +105,7 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception { DeletePitResponse deletePITResponse = deleteExecute.get(); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertTrue(pitIds.contains(deletePitInfo.getPitId())); - assertTrue(deletePitInfo.isSucceeded()); + assertTrue(deletePitInfo.isSuccessful()); } execute = client().execute(CreatePitAction.INSTANCE, request); pitResponse = execute.get(); @@ -118,7 +118,7 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception { deletePITResponse = deleteExecute.get(); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertTrue(pitIds.contains(deletePitInfo.getPitId())); - assertTrue(deletePitInfo.isSucceeded()); + assertTrue(deletePitInfo.isSuccessful()); } } @@ -152,7 +152,7 @@ public void testDeleteAllPits() throws Exception { DeletePitResponse deletePITResponse = execute.get(); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); - assertTrue(deletePitInfo.isSucceeded()); + assertTrue(deletePitInfo.isSuccessful()); } client().admin().indices().prepareDelete("index1").get(); } @@ -175,7 +175,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { DeletePitResponse deletePITResponse = execute.get(); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertTrue(pitIds.contains(deletePitInfo.getPitId())); - assertFalse(deletePitInfo.isSucceeded()); + assertFalse(deletePitInfo.isSuccessful()); } } catch (Exception e) { throw new AssertionError(e); @@ -193,7 +193,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { DeletePitResponse deletePITResponse = execute.get(); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertTrue(pitIds.contains(deletePitInfo.getPitId())); - assertTrue(deletePitInfo.isSucceeded()); + assertTrue(deletePitInfo.isSuccessful()); } client().admin().indices().prepareDelete("index1").get(); } @@ -212,7 +212,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { DeletePitResponse deletePITResponse = execute.get(); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); - assertFalse(deletePitInfo.isSucceeded()); + assertFalse(deletePitInfo.isSuccessful()); } } catch (Exception e) { assertTrue(e.getMessage().contains("Node not connected")); @@ -230,7 +230,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { DeletePitResponse deletePITResponse = execute.get(); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); - assertTrue(deletePitInfo.isSucceeded()); + assertTrue(deletePitInfo.isSuccessful()); } client().admin().indices().prepareDelete("index1").get(); } @@ -276,7 +276,7 @@ public void testDeleteWhileSearch() throws Exception { deleted.set(true); for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { assertTrue(pitIds.contains(deletePitInfo.getPitId())); - assertTrue(deletePitInfo.isSucceeded()); + assertTrue(deletePitInfo.isSuccessful()); } for (Thread thread : threads) { diff --git a/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java b/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java index 1ff956f4b1d9d..5944e2a35b14a 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java @@ -37,7 +37,7 @@ public void testDeletePitResponseToXContent() throws IOException { deletePitResponse.toXContent(builder, ToXContent.EMPTY_PARAMS); } assertEquals(true, deletePitResponse.getDeletePitResults().get(0).getPitId().equals("pitId")); - assertEquals(true, deletePitResponse.getDeletePitResults().get(0).isSucceeded()); + assertEquals(true, deletePitResponse.getDeletePitResults().get(0).isSuccessful()); } public void testDeletePitResponseToAndFromXContent() throws IOException { @@ -50,8 +50,8 @@ public void testDeletePitResponseToAndFromXContent() throws IOException { parsedResponse = DeletePitResponse.fromXContent(parser); } assertEquals( - originalResponse.getDeletePitResults().get(0).isSucceeded(), - parsedResponse.getDeletePitResults().get(0).isSucceeded() + originalResponse.getDeletePitResults().get(0).isSuccessful(), + parsedResponse.getDeletePitResults().get(0).isSuccessful() ); assertEquals(originalResponse.getDeletePitResults().get(0).getPitId(), parsedResponse.getDeletePitResults().get(0).getPitId()); BytesReference parsedBytes = XContentHelper.toXContent(parsedResponse, xContentType, randomBoolean()); diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index b7a5b45259308..cbfcb43c22a21 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1432,10 +1432,10 @@ public void testDeletePitReaderContext() { assertThat(searchService.getActiveContexts(), equalTo(1)); DeletePitResponse deletePitResponse = searchService.freeReaderContextsIfFound(contextIds); - assertTrue(deletePitResponse.getDeletePitResults().get(0).isSucceeded()); + assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); // assert true for reader context not found deletePitResponse = searchService.freeReaderContextsIfFound(contextIds); - assertTrue(deletePitResponse.getDeletePitResults().get(0).isSucceeded()); + assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); // adding this assert to showcase behavior difference assertFalse(searchService.freeReaderContext(future.actionGet())); }