Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into np-48346-added-module-for-oai
Browse files Browse the repository at this point in the history
  • Loading branch information
ketilaa committed Jan 9, 2025
2 parents be3f678 + f9fcd79 commit 208de38
Show file tree
Hide file tree
Showing 143 changed files with 6,375 additions and 7,091 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# style: Reformat project with Spotless
2dfd69cbc977fa45922485f1d252d81fc0da7447

# style: Reformat project
0825cca42191e030ef7c46cc9d86c8d004370d85
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package no.unit.nva.indexingclient;

import com.fasterxml.jackson.databind.JsonNode;

import java.util.ArrayList;
import java.util.List;

Expand Down
174 changes: 85 additions & 89 deletions batch-index/src/main/java/no/unit/nva/indexingclient/BatchIndexer.java
Original file line number Diff line number Diff line change
@@ -1,106 +1,102 @@
package no.unit.nva.indexingclient;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import no.unit.nva.identifiers.SortableIdentifier;
import no.unit.nva.indexingclient.models.IndexDocument;
import no.unit.nva.s3.ListingResult;
import no.unit.nva.s3.S3Driver;

import nva.commons.core.paths.UnixPath;

import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkItemResponse.Failure;
import org.opensearch.action.bulk.BulkResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.services.s3.S3Client;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class BatchIndexer implements IndexingResult<SortableIdentifier> {

private static final Logger logger = LoggerFactory.getLogger(BatchIndexer.class);
private final ImportDataRequestEvent importDataRequest;
private final S3Driver s3Driver;
private final IndexingClient openSearchRestClient;
private final int numberOfFilesPerEvent;
private IndexingResultRecord<SortableIdentifier> processingResult;

public BatchIndexer(
ImportDataRequestEvent importDataRequestEvent,
S3Client s3Client,
IndexingClient openSearchRestClient,
int numberOfFilesPerEvent) {
this.importDataRequest = importDataRequestEvent;
this.openSearchRestClient = openSearchRestClient;
this.s3Driver = new S3Driver(s3Client, importDataRequestEvent.getBucket());
this.numberOfFilesPerEvent = numberOfFilesPerEvent;
}

public IndexingResult<SortableIdentifier> processRequest() {

ListingResult listFilesResult = fetchNextPageOfFilenames();
List<IndexDocument> contents =
fileContents(listFilesResult.getFiles()).collect(Collectors.toList());
List<SortableIdentifier> failedResults = indexFileContents(contents);
this.processingResult =
new IndexingResultRecord<>(
failedResults,
listFilesResult.getListingStartingPoint(),
listFilesResult.isTruncated());

return this;
}

private Stream<IndexDocument> fileContents(List<UnixPath> files) {
return files.stream().map(s3Driver::getFile).map(IndexDocument::fromJsonString);
}

@Override
public List<SortableIdentifier> failedResults() {
return this.processingResult.failedResults();
}

@Override
public String nextStartMarker() {
return processingResult.nextStartMarker();
}

@Override
public boolean truncated() {
return this.processingResult.truncated();
}

private ListingResult fetchNextPageOfFilenames() {
return s3Driver.listFiles(
UnixPath.of(importDataRequest.getS3Path()),
importDataRequest.getStartMarker(),
numberOfFilesPerEvent);
}

private List<SortableIdentifier> indexFileContents(List<IndexDocument> contents) {

Stream<BulkResponse> result = openSearchRestClient.batchInsert(contents.stream());
List<SortableIdentifier> failures = collectFailures(result).collect(Collectors.toList());
failures.forEach(this::logFailure);
return failures;
}

private <T> void logFailure(T failureMessage) {
logger.warn("Failed to index resource:{}", failureMessage);
}

private Stream<SortableIdentifier> collectFailures(Stream<BulkResponse> indexActions) {
return indexActions
.filter(BulkResponse::hasFailures)
.map(BulkResponse::getItems)
.flatMap(Arrays::stream)
.filter(BulkItemResponse::isFailed)
.map(BulkItemResponse::getFailure)
.map(Failure::getId)
.map(SortableIdentifier::new);
}
private static final Logger logger = LoggerFactory.getLogger(BatchIndexer.class);
private final ImportDataRequestEvent importDataRequest;
private final S3Driver s3Driver;
private final IndexingClient openSearchRestClient;
private final int numberOfFilesPerEvent;
private IndexingResultRecord<SortableIdentifier> processingResult;

public BatchIndexer(
ImportDataRequestEvent importDataRequestEvent,
S3Client s3Client,
IndexingClient openSearchRestClient,
int numberOfFilesPerEvent) {
this.importDataRequest = importDataRequestEvent;
this.openSearchRestClient = openSearchRestClient;
this.s3Driver = new S3Driver(s3Client, importDataRequestEvent.getBucket());
this.numberOfFilesPerEvent = numberOfFilesPerEvent;
}

public IndexingResult<SortableIdentifier> processRequest() {

ListingResult listFilesResult = fetchNextPageOfFilenames();
List<IndexDocument> contents =
fileContents(listFilesResult.getFiles()).collect(Collectors.toList());
List<SortableIdentifier> failedResults = indexFileContents(contents);
this.processingResult =
new IndexingResultRecord<>(
failedResults,
listFilesResult.getListingStartingPoint(),
listFilesResult.isTruncated());

return this;
}

private Stream<IndexDocument> fileContents(List<UnixPath> files) {
return files.stream().map(s3Driver::getFile).map(IndexDocument::fromJsonString);
}

@Override
public List<SortableIdentifier> failedResults() {
return this.processingResult.failedResults();
}

@Override
public String nextStartMarker() {
return processingResult.nextStartMarker();
}

@Override
public boolean truncated() {
return this.processingResult.truncated();
}

private ListingResult fetchNextPageOfFilenames() {
return s3Driver.listFiles(
UnixPath.of(importDataRequest.getS3Path()),
importDataRequest.getStartMarker(),
numberOfFilesPerEvent);
}

private List<SortableIdentifier> indexFileContents(List<IndexDocument> contents) {

Stream<BulkResponse> result = openSearchRestClient.batchInsert(contents.stream());
List<SortableIdentifier> failures = collectFailures(result).collect(Collectors.toList());
failures.forEach(this::logFailure);
return failures;
}

private <T> void logFailure(T failureMessage) {
logger.warn("Failed to index resource:{}", failureMessage);
}

private Stream<SortableIdentifier> collectFailures(Stream<BulkResponse> indexActions) {
return indexActions
.filter(BulkResponse::hasFailures)
.map(BulkResponse::getItems)
.flatMap(Arrays::stream)
.filter(BulkItemResponse::isFailed)
.map(BulkItemResponse::getFailure)
.map(Failure::getId)
.map(SortableIdentifier::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import nva.commons.core.JacocoGenerated;

import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.eventbridge.EventBridgeClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,40 @@
import static no.unit.nva.indexingclient.Constants.MANDATORY_UNUSED_SUBTOPIC;

import com.amazonaws.services.lambda.runtime.Context;

import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.services.eventbridge.EventBridgeClient;
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequest;
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry;

import java.time.Instant;

public final class EmitEventUtils {

private static final Logger logger = LoggerFactory.getLogger(EmitEventUtils.class);

private EmitEventUtils() {}

public static void emitEvent(
EventBridgeClient eventBridgeClient,
ImportDataRequestEvent importDataRequest,
Context context) {
var putEventRequestEntry = eventEntry(importDataRequest, context);
logger.debug("BusName:" + BATCH_INDEX_EVENT_BUS_NAME);
logger.debug("Event:" + putEventRequestEntry.toString());
var putEventRequest = PutEventsRequest.builder().entries(putEventRequestEntry).build();
var response = eventBridgeClient.putEvents(putEventRequest);
logger.debug(response.toString());
}

private static PutEventsRequestEntry eventEntry(
ImportDataRequestEvent importDataRequest, Context context) {
return PutEventsRequestEntry.builder()
.eventBusName(BATCH_INDEX_EVENT_BUS_NAME)
.detailType(MANDATORY_UNUSED_SUBTOPIC)
.source(EventBasedBatchIndexer.class.getName())
.time(Instant.now())
.detail(importDataRequest.toJsonString())
.resources(context.getInvokedFunctionArn())
.build();
}
private static final Logger logger = LoggerFactory.getLogger(EmitEventUtils.class);

private EmitEventUtils() {}

public static void emitEvent(
EventBridgeClient eventBridgeClient,
ImportDataRequestEvent importDataRequest,
Context context) {
var putEventRequestEntry = eventEntry(importDataRequest, context);
logger.debug("BusName:" + BATCH_INDEX_EVENT_BUS_NAME);
logger.debug("Event:" + putEventRequestEntry.toString());
var putEventRequest = PutEventsRequest.builder().entries(putEventRequestEntry).build();
var response = eventBridgeClient.putEvents(putEventRequest);
logger.debug(response.toString());
}

private static PutEventsRequestEntry eventEntry(
ImportDataRequestEvent importDataRequest, Context context) {
return PutEventsRequestEntry.builder()
.eventBusName(BATCH_INDEX_EVENT_BUS_NAME)
.detailType(MANDATORY_UNUSED_SUBTOPIC)
.source(EventBasedBatchIndexer.class.getName())
.time(Instant.now())
.detail(importDataRequest.toJsonString())
.resources(context.getInvokedFunctionArn())
.build();
}
}
Loading

0 comments on commit 208de38

Please sign in to comment.