From d3938840288278568ccf0259340e19dbf067fd7f Mon Sep 17 00:00:00 2001 From: Manasvini B S Date: Mon, 19 Aug 2024 18:54:26 -0700 Subject: [PATCH] Addressed comments Signed-off-by: Manasvini B S --- .../sql/legacy/cursor/DefaultCursor.java | 70 ++++++++++--------- .../legacy/pit/PointInTimeHandlerImpl.java | 39 +++++------ .../pit/PointInTimeHandlerImplTest.java | 15 ++-- 3 files changed, 64 insertions(+), 60 deletions(-) diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java b/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java index 3b8fc802d1..75123ae24a 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java @@ -13,20 +13,16 @@ import com.google.common.base.Strings; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.security.AccessController; import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.Base64; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.Setter; -import lombok.SneakyThrows; import org.json.JSONArray; import org.json.JSONObject; import org.opensearch.common.settings.Settings; @@ -115,7 +111,7 @@ public class DefaultCursor implements Cursor { */ private static final NamedXContentRegistry xContentRegistry = new NamedXContentRegistry( - new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents()); + new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents()); @Override public CursorType getType() { @@ -124,11 +120,7 @@ public CursorType getType() { @Override public String generateCursorId() { - boolean isCursorValid = - LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER) - ? Strings.isNullOrEmpty(pitId) - : Strings.isNullOrEmpty(scrollId); - if (rowsLeft <= 0 || isCursorValid) { + if (rowsLeft <= 0 || isCursorValid()) { return null; } JSONObject json = new JSONObject(); @@ -156,14 +148,18 @@ public String generateCursorId() { return String.format("%s:%s", type.getId(), encodeCursor(json, searchSourceBuilder)); } - @SneakyThrows + private boolean isCursorValid() { + return LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER) + ? Strings.isNullOrEmpty(pitId) + : Strings.isNullOrEmpty(scrollId); + } + public static DefaultCursor from(String cursorId) { /** * It is assumed that cursorId here is the second part of the original cursor passed by the * client after removing first part which identifies cursor type */ - String[] parts = cursorId.split(":::"); - JSONObject json = decodeCursor(parts[0]); + JSONObject json = decodeCursor(cursorId); DefaultCursor cursor = new DefaultCursor(); cursor.setFetchSize(json.getInt(FETCH_SIZE)); cursor.setRowsLeft(json.getLong(ROWS_LEFT)); @@ -183,14 +179,20 @@ public static DefaultCursor from(String cursorId) { }); cursor.setSortFields(sortFieldValue); - byte[] bytes = Base64.getDecoder().decode(parts[1]); + // Retrieve the SearchSourceBuilder from the JSON field + String searchSourceBuilderBase64 = json.getString("searchSourceBuilder"); + byte[] bytes = Base64.getDecoder().decode(searchSourceBuilderBase64); ByteArrayInputStream streamInput = new ByteArrayInputStream(bytes); - XContentParser parser = - XContentType.JSON - .xContent() - .createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput); - SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser); - cursor.searchSourceBuilder = sourceBuilder; + try { + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput); + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser); + cursor.searchSourceBuilder = sourceBuilder; + } catch (IOException ex) { + throw new RuntimeException("Failed to get searchSourceBuilder from cursor Id", ex); + } } else { cursor.setScrollId(json.getString(SCROLL_ID)); } @@ -220,18 +222,20 @@ private JSONObject schemaEntry(String name, String alias, String type) { return entry; } - @SneakyThrows private static String encodeCursor(JSONObject cursorJson, SearchSourceBuilder sourceBuilder) { - String jsonBase64 = Base64.getEncoder().encodeToString(cursorJson.toString().getBytes()); - - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - XContentBuilder builder = XContentFactory.jsonBuilder(outputStream); - sourceBuilder.toXContent(builder, null); - builder.close(); - - String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray()); - - return jsonBase64 + ":::" + searchRequestBase64; + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + XContentBuilder builder = XContentFactory.jsonBuilder(outputStream); + sourceBuilder.toXContent(builder, null); + builder.close(); + + String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray()); + cursorJson.put("searchSourceBuilder", searchRequestBase64); + + return Base64.getEncoder().encodeToString(cursorJson.toString().getBytes()); + } catch (IOException ex) { + throw new RuntimeException("Failed to encode cursor", ex); + } } private static JSONObject decodeCursor(String cursorId) { diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java b/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java index b5b9827a9b..d9768365b0 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java @@ -7,6 +7,8 @@ import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import lombok.Getter; import lombok.Setter; import org.apache.logging.log4j.LogManager; @@ -24,8 +26,6 @@ public class PointInTimeHandlerImpl implements PointInTimeHandler { private Client client; private String[] indices; @Getter @Setter private String pitId; - private Boolean deleteStatus = null; - private Boolean createStatus = null; private static final Logger LOG = LogManager.getLogger(); /** @@ -57,6 +57,8 @@ public PointInTimeHandlerImpl(Client client, String pitId) { */ @Override public boolean create() { + CompletableFuture future = new CompletableFuture<>(); + CreatePitRequest createPitRequest = new CreatePitRequest( LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE), false, indices); @@ -67,24 +69,21 @@ public boolean create() { @Override public void onResponse(CreatePitResponse createPitResponse) { pitId = createPitResponse.getId(); - createStatus = true; LOG.info("Created Point In Time {} successfully.", pitId); + future.complete(true); } @Override public void onFailure(Exception e) { - createStatus = false; LOG.error("Error occurred while creating PIT", e); + future.completeExceptionally(new RuntimeException("Failed to create PIT", e)); } }); - while (createStatus == null) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - LOG.error("Error occurred while creating PIT", e); - } + try { + return future.join(); // Block until the PIT creation is complete, handling exceptions + } catch (CompletionException e) { + throw new RuntimeException("Failed to create PIT", e.getCause()); } - return createStatus; } /** @@ -94,33 +93,31 @@ public void onFailure(Exception e) { */ @Override public boolean delete() { + CompletableFuture future = new CompletableFuture<>(); + DeletePitRequest deletePitRequest = new DeletePitRequest(pitId); client.deletePits( deletePitRequest, new ActionListener<>() { @Override public void onResponse(DeletePitResponse deletePitResponse) { - deleteStatus = true; LOG.info( "Delete Point In Time {} status: {}", pitId, deletePitResponse.status().getStatus()); + future.complete(true); } @Override public void onFailure(Exception e) { - deleteStatus = false; LOG.error("Error occurred while deleting PIT", e); + future.completeExceptionally(new RuntimeException("Failed to delete PIT", e)); } }); - - while (deleteStatus == null) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - LOG.error("Error occurred while deleting PIT", e); - } + try { + return future.join(); // Block until the PIT deletion is complete, handling exceptions + } catch (CompletionException e) { + throw new RuntimeException("Failed to delete PIT", e.getCause()); } - return deleteStatus; } } diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImplTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImplTest.java index e17e554952..e300488e78 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImplTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImplTest.java @@ -4,8 +4,7 @@ */ package org.opensearch.sql.legacy.pit; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -94,10 +93,12 @@ public void testCreateForFailure() { .when(mockClient) .createPit(any(), listenerCaptorForCreate.capture()); - boolean status = pointInTimeHandlerImpl.create(); + RuntimeException thrownException = + assertThrows(RuntimeException.class, () -> pointInTimeHandlerImpl.create()); + assertNotNull(thrownException.getCause()); + assertEquals("Failed to create PIT", thrownException.getMessage()); verify(mockClient).createPit(any(), listenerCaptorForCreate.capture()); listenerCaptorForCreate.getValue().onResponse(mockCreatePitResponse); - assertFalse(status); } @Test @@ -128,8 +129,10 @@ public void testDeleteForFailure() { .when(mockClient) .deletePits(any(), listenerCaptorForDelete.capture()); - boolean status = pointInTimeHandlerImpl.delete(); - assertFalse(status); + RuntimeException thrownException = + assertThrows(RuntimeException.class, () -> pointInTimeHandlerImpl.delete()); + assertNotNull(thrownException.getCause()); + assertEquals("Failed to delete PIT", thrownException.getMessage()); verify(mockClient).deletePits(any(), listenerCaptorForDelete.capture()); listenerCaptorForDelete.getValue().onResponse(mockDeletePitResponse); }