Skip to content

Commit

Permalink
Moved SHARD_ID to DEFAULT_SHARD_ID (#597)
Browse files Browse the repository at this point in the history
  • Loading branch information
StigNorland authored Oct 14, 2024
1 parent 7a876f9 commit 6095919
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected Void processInputPayload(
AwsEventBridgeEvent<AwsEventBridgeDetail<DeleteImportCandidateEvent>> event,
Context context) {
try {
indexingClient.removeDocumentFromImportCandidateIndex(input.getIdentifier().toString());
indexingClient.removeDocumentFromImportCandidateIndex(input.identifier().toString());
logger.info(REMOVED_FROM_INDEX_MESSAGE);
} catch (Exception e) {
logError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,17 @@
import no.unit.nva.commons.json.JsonSerializable;
import no.unit.nva.identifiers.SortableIdentifier;

import nva.commons.core.JacocoGenerated;

import java.util.Objects;

public class DeleteImportCandidateEvent implements JsonSerializable {
public record DeleteImportCandidateEvent(String topic, SortableIdentifier identifier) implements JsonSerializable {

public static final String EVENT_TOPIC = "ImportCandidates.ExpandedEntry.Deleted";
public static final String TOPIC = "topic";
public static final String IDENTIFIER = "identifier";

private final String topic;
private final SortableIdentifier identifier;

@JsonCreator
public DeleteImportCandidateEvent(
@JsonProperty(TOPIC) String topic,
@JsonProperty(IDENTIFIER) SortableIdentifier identifier) {
this.topic = topic;
this.identifier = identifier;
}

@JacocoGenerated
public String getTopic() {
return topic;
}

public SortableIdentifier getIdentifier() {
return identifier;
}

@JacocoGenerated
@Override
public int hashCode() {
return Objects.hash(getTopic(), getIdentifier());
}

@Override
@JacocoGenerated
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DeleteImportCandidateEvent that = (DeleteImportCandidateEvent) o;
return topic.equals(that.topic) && identifier.equals(that.identifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class Defaults {
List.of(MediaType.JSON_UTF_8, MediaTypes.APPLICATION_JSON_LD, MediaType.CSV_UTF_8);

public static final int ZERO_RESULTS_AGGREGATION_ONLY = 0;
public static final String DEFAULT_SHARD_ID = "0";

@JacocoGenerated
public Defaults() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package no.unit.nva.indexingclient;

import static no.unit.nva.constants.Defaults.DEFAULT_SHARD_ID;
import static no.unit.nva.constants.Words.IMPORT_CANDIDATES_INDEX;
import static no.unit.nva.constants.Words.RESOURCES;
import static no.unit.nva.indexingclient.models.Constants.SHARD_ID;
import static no.unit.nva.indexingclient.models.RestHighLevelClientWrapper.defaultRestHighLevelClientWrapper;

import static nva.commons.core.attempt.Try.attempt;
Expand All @@ -27,6 +27,7 @@
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
Expand All @@ -50,8 +51,7 @@ public class IndexingClient extends AuthenticatedOpenSearchClientWrapper {

public static final int BULK_SIZE = 100;
private static final Logger logger = LoggerFactory.getLogger(IndexingClient.class);
// public static final objectMapperWithEmpty objectMapperWithEmpty =
// JsonUtils.dtoObjectMapper;

private static final String INITIAL_LOG_MESSAGE = "Adding document [{}] to -> {}";
private static final String DOCUMENT_WITH_ID_WAS_NOT_FOUND_IN_SEARCH_INFRASTRUCTURE =
"Document with id={} was not found in search infrastructure";
Expand Down Expand Up @@ -97,21 +97,17 @@ public Void addDocumentToIndex(IndexDocument indexDocument) throws IOException {
public void removeDocumentFromResourcesIndex(String identifier) throws IOException {
var deleteRequest = deleteDocumentRequest(RESOURCES, identifier);
var deleteResponse = openSearchClient.delete(deleteRequest, getRequestOptions());
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
logger.warn(DOCUMENT_WITH_ID_WAS_NOT_FOUND_IN_SEARCH_INFRASTRUCTURE, identifier);
}
loggWarningIfNotFound(identifier, deleteResponse);
}

private static DeleteRequest deleteDocumentRequest(String index, String identifier) {
return new DeleteRequest(index, identifier).routing(SHARD_ID);
return new DeleteRequest(index, identifier).routing(DEFAULT_SHARD_ID);
}

public void removeDocumentFromImportCandidateIndex(String identifier) throws IOException {
var deleteRequest = deleteDocumentRequest(IMPORT_CANDIDATES_INDEX, identifier);
var deleteResponse = openSearchClient.delete(deleteRequest, getRequestOptions());
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
logger.warn(DOCUMENT_WITH_ID_WAS_NOT_FOUND_IN_SEARCH_INFRASTRUCTURE, identifier);
}
loggWarningIfNotFound(identifier, deleteResponse);
}

public Void createIndex(String indexName) throws IOException {
Expand Down Expand Up @@ -141,20 +137,6 @@ public Stream<BulkResponse> batchInsert(Stream<IndexDocument> contents) {
return batches.map(attempt(this::insertBatch)).map(Try::orElseThrow);
}

public Void deleteIndex(String indexName) throws IOException {
openSearchClient.indices().delete(new DeleteIndexRequest(indexName), getRequestOptions());
return null;
}

public JsonNode getMapping(String indexName) {
return attempt(() -> getMappingMetadata(indexName))
.map(MappingMetadata::source)
.map(CompressedXContent::uncompressed)
.map(BytesReference::utf8ToString)
.map(JsonUtils.dtoObjectMapper::readTree)
.orElseThrow();
}

private Stream<List<IndexDocument>> splitStreamToBatches(Stream<IndexDocument> indexDocuments) {
UnmodifiableIterator<List<IndexDocument>> bulks =
Iterators.partition(indexDocuments.iterator(), BULK_SIZE);
Expand All @@ -173,6 +155,26 @@ private BulkResponse insertBatch(List<IndexDocument> bulk) throws IOException {
return openSearchClient.bulk(request, getRequestOptions());
}

public Void deleteIndex(String indexName) throws IOException {
openSearchClient.indices().delete(new DeleteIndexRequest(indexName), getRequestOptions());
return null;
}

public JsonNode getMapping(String indexName) {
return attempt(() -> getMappingMetadata(indexName))
.map(MappingMetadata::source)
.map(CompressedXContent::uncompressed)
.map(BytesReference::utf8ToString)
.map(JsonUtils.dtoObjectMapper::readTree)
.orElseThrow();
}

private void loggWarningIfNotFound(String identifier, DeleteResponse deleteResponse) {
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
logger.warn(DOCUMENT_WITH_ID_WAS_NOT_FOUND_IN_SEARCH_INFRASTRUCTURE, identifier);
}
}

private MappingMetadata getMappingMetadata(String indexName) throws IOException {
var request = new GetMappingsRequest().indices(indexName);
return openSearchClient
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package no.unit.nva.indexingclient.models;

import static no.unit.nva.constants.Defaults.DEFAULT_SHARD_ID;
import static no.unit.nva.constants.Defaults.objectMapperWithEmpty;
import static no.unit.nva.constants.ErrorMessages.MISSING_IDENTIFIER_IN_RESOURCE;
import static no.unit.nva.constants.ErrorMessages.MISSING_INDEX_NAME_IN_RESOURCE;
Expand All @@ -8,7 +9,6 @@
import static no.unit.nva.constants.Words.IMPORT_CANDIDATES_INDEX;
import static no.unit.nva.constants.Words.RESOURCES;
import static no.unit.nva.constants.Words.TICKETS;
import static no.unit.nva.indexingclient.models.Constants.SHARD_ID;

import static nva.commons.core.attempt.Try.attempt;

Expand Down Expand Up @@ -80,7 +80,7 @@ public String getDocumentIdentifier() {
public IndexRequest toIndexRequest() {
return new IndexRequest(getIndexName())
.source(serializeResource(), XContentType.JSON)
.routing(SHARD_ID)
.routing(DEFAULT_SHARD_ID)
.id(getDocumentIdentifier());
}

Expand Down

0 comments on commit 6095919

Please sign in to comment.