Skip to content

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
Signed-off-by: Manasvini B S <[email protected]>
  • Loading branch information
manasvinibs committed Aug 20, 2024
1 parent 167dce0 commit d393884
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,16 @@
import com.google.common.base.Strings;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.Setter;
import lombok.SneakyThrows;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -115,7 +111,7 @@ public class DefaultCursor implements Cursor {
*/
private static final NamedXContentRegistry xContentRegistry =
new NamedXContentRegistry(
new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents());
new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents());

@Override
public CursorType getType() {
Expand All @@ -124,11 +120,7 @@ public CursorType getType() {

@Override
public String generateCursorId() {
boolean isCursorValid =
LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)
? Strings.isNullOrEmpty(pitId)
: Strings.isNullOrEmpty(scrollId);
if (rowsLeft <= 0 || isCursorValid) {
if (rowsLeft <= 0 || isCursorValid()) {
return null;
}
JSONObject json = new JSONObject();
Expand Down Expand Up @@ -156,14 +148,18 @@ public String generateCursorId() {
return String.format("%s:%s", type.getId(), encodeCursor(json, searchSourceBuilder));
}

@SneakyThrows
private boolean isCursorValid() {
return LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)
? Strings.isNullOrEmpty(pitId)
: Strings.isNullOrEmpty(scrollId);
}

public static DefaultCursor from(String cursorId) {
/**
* It is assumed that cursorId here is the second part of the original cursor passed by the
* client after removing first part which identifies cursor type
*/
String[] parts = cursorId.split(":::");
JSONObject json = decodeCursor(parts[0]);
JSONObject json = decodeCursor(cursorId);
DefaultCursor cursor = new DefaultCursor();
cursor.setFetchSize(json.getInt(FETCH_SIZE));
cursor.setRowsLeft(json.getLong(ROWS_LEFT));
Expand All @@ -183,14 +179,20 @@ public static DefaultCursor from(String cursorId) {
});
cursor.setSortFields(sortFieldValue);

byte[] bytes = Base64.getDecoder().decode(parts[1]);
// Retrieve the SearchSourceBuilder from the JSON field
String searchSourceBuilderBase64 = json.getString("searchSourceBuilder");
byte[] bytes = Base64.getDecoder().decode(searchSourceBuilderBase64);
ByteArrayInputStream streamInput = new ByteArrayInputStream(bytes);
XContentParser parser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser);
cursor.searchSourceBuilder = sourceBuilder;
try {
XContentParser parser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser);
cursor.searchSourceBuilder = sourceBuilder;
} catch (IOException ex) {
throw new RuntimeException("Failed to get searchSourceBuilder from cursor Id", ex);
}
} else {
cursor.setScrollId(json.getString(SCROLL_ID));
}
Expand Down Expand Up @@ -220,18 +222,20 @@ private JSONObject schemaEntry(String name, String alias, String type) {
return entry;
}

@SneakyThrows
private static String encodeCursor(JSONObject cursorJson, SearchSourceBuilder sourceBuilder) {
String jsonBase64 = Base64.getEncoder().encodeToString(cursorJson.toString().getBytes());

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
XContentBuilder builder = XContentFactory.jsonBuilder(outputStream);
sourceBuilder.toXContent(builder, null);
builder.close();

String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray());

return jsonBase64 + ":::" + searchRequestBase64;
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
XContentBuilder builder = XContentFactory.jsonBuilder(outputStream);
sourceBuilder.toXContent(builder, null);
builder.close();

String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray());
cursorJson.put("searchSourceBuilder", searchRequestBase64);

return Base64.getEncoder().encodeToString(cursorJson.toString().getBytes());
} catch (IOException ex) {
throw new RuntimeException("Failed to encode cursor", ex);
}
}

private static JSONObject decodeCursor(String cursorId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
Expand All @@ -24,8 +26,6 @@ public class PointInTimeHandlerImpl implements PointInTimeHandler {
private Client client;
private String[] indices;
@Getter @Setter private String pitId;
private Boolean deleteStatus = null;
private Boolean createStatus = null;
private static final Logger LOG = LogManager.getLogger();

/**
Expand Down Expand Up @@ -57,6 +57,8 @@ public PointInTimeHandlerImpl(Client client, String pitId) {
*/
@Override
public boolean create() {
CompletableFuture<Boolean> future = new CompletableFuture<>();

CreatePitRequest createPitRequest =
new CreatePitRequest(
LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE), false, indices);
Expand All @@ -67,24 +69,21 @@ public boolean create() {
@Override
public void onResponse(CreatePitResponse createPitResponse) {
pitId = createPitResponse.getId();
createStatus = true;
LOG.info("Created Point In Time {} successfully.", pitId);
future.complete(true);
}

@Override
public void onFailure(Exception e) {
createStatus = false;
LOG.error("Error occurred while creating PIT", e);
future.completeExceptionally(new RuntimeException("Failed to create PIT", e));
}
});
while (createStatus == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("Error occurred while creating PIT", e);
}
try {
return future.join(); // Block until the PIT creation is complete, handling exceptions
} catch (CompletionException e) {
throw new RuntimeException("Failed to create PIT", e.getCause());
}
return createStatus;
}

/**
Expand All @@ -94,33 +93,31 @@ public void onFailure(Exception e) {
*/
@Override
public boolean delete() {
CompletableFuture<Boolean> future = new CompletableFuture<>();

DeletePitRequest deletePitRequest = new DeletePitRequest(pitId);
client.deletePits(
deletePitRequest,
new ActionListener<>() {
@Override
public void onResponse(DeletePitResponse deletePitResponse) {
deleteStatus = true;
LOG.info(
"Delete Point In Time {} status: {}",
pitId,
deletePitResponse.status().getStatus());
future.complete(true);
}

@Override
public void onFailure(Exception e) {
deleteStatus = false;
LOG.error("Error occurred while deleting PIT", e);
future.completeExceptionally(new RuntimeException("Failed to delete PIT", e));
}
});

while (deleteStatus == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("Error occurred while deleting PIT", e);
}
try {
return future.join(); // Block until the PIT deletion is complete, handling exceptions
} catch (CompletionException e) {
throw new RuntimeException("Failed to delete PIT", e.getCause());
}
return deleteStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
*/
package org.opensearch.sql.legacy.pit;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -94,10 +93,12 @@ public void testCreateForFailure() {
.when(mockClient)
.createPit(any(), listenerCaptorForCreate.capture());

boolean status = pointInTimeHandlerImpl.create();
RuntimeException thrownException =
assertThrows(RuntimeException.class, () -> pointInTimeHandlerImpl.create());
assertNotNull(thrownException.getCause());
assertEquals("Failed to create PIT", thrownException.getMessage());
verify(mockClient).createPit(any(), listenerCaptorForCreate.capture());
listenerCaptorForCreate.getValue().onResponse(mockCreatePitResponse);
assertFalse(status);
}

@Test
Expand Down Expand Up @@ -128,8 +129,10 @@ public void testDeleteForFailure() {
.when(mockClient)
.deletePits(any(), listenerCaptorForDelete.capture());

boolean status = pointInTimeHandlerImpl.delete();
assertFalse(status);
RuntimeException thrownException =
assertThrows(RuntimeException.class, () -> pointInTimeHandlerImpl.delete());
assertNotNull(thrownException.getCause());
assertEquals("Failed to delete PIT", thrownException.getMessage());
verify(mockClient).deletePits(any(), listenerCaptorForDelete.capture());
listenerCaptorForDelete.getValue().onResponse(mockDeletePitResponse);
}
Expand Down

0 comments on commit d393884

Please sign in to comment.