Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed missing format #623

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 81 additions & 81 deletions batch-index/src/main/java/no/unit/nva/indexingclient/BatchIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,85 +18,85 @@

public class BatchIndexer implements IndexingResult<SortableIdentifier> {

private static final Logger logger = LoggerFactory.getLogger(BatchIndexer.class);
private final ImportDataRequestEvent importDataRequest;
private final S3Driver s3Driver;
private final IndexingClient openSearchRestClient;
private final int numberOfFilesPerEvent;
private IndexingResultRecord<SortableIdentifier> processingResult;

public BatchIndexer(
ImportDataRequestEvent importDataRequestEvent,
S3Client s3Client,
IndexingClient openSearchRestClient,
int numberOfFilesPerEvent) {
this.importDataRequest = importDataRequestEvent;
this.openSearchRestClient = openSearchRestClient;
this.s3Driver = new S3Driver(s3Client, importDataRequestEvent.getBucket());
this.numberOfFilesPerEvent = numberOfFilesPerEvent;
}

public IndexingResult<SortableIdentifier> processRequest() {

ListingResult listFilesResult = fetchNextPageOfFilenames();
List<IndexDocument> contents =
fileContents(listFilesResult.getFiles()).collect(Collectors.toList());
List<SortableIdentifier> failedResults = indexFileContents(contents);
this.processingResult =
new IndexingResultRecord<>(
failedResults,
listFilesResult.getListingStartingPoint(),
listFilesResult.isTruncated());

return this;
}

private Stream<IndexDocument> fileContents(List<UnixPath> files) {
return files.stream().map(s3Driver::getFile).map(IndexDocument::fromJsonString);
}

@Override
public List<SortableIdentifier> failedResults() {
return this.processingResult.failedResults();
}

@Override
public String nextStartMarker() {
return processingResult.nextStartMarker();
}

@Override
public boolean truncated() {
return this.processingResult.truncated();
}

private ListingResult fetchNextPageOfFilenames() {
return s3Driver.listFiles(
UnixPath.of(importDataRequest.getS3Path()),
importDataRequest.getStartMarker(),
numberOfFilesPerEvent);
}

private List<SortableIdentifier> indexFileContents(List<IndexDocument> contents) {

Stream<BulkResponse> result = openSearchRestClient.batchInsert(contents.stream());
List<SortableIdentifier> failures = collectFailures(result).collect(Collectors.toList());
failures.forEach(this::logFailure);
return failures;
}

private <T> void logFailure(T failureMessage) {
logger.warn("Failed to index resource:{}", failureMessage);
}

private Stream<SortableIdentifier> collectFailures(Stream<BulkResponse> indexActions) {
return indexActions
.filter(BulkResponse::hasFailures)
.map(BulkResponse::getItems)
.flatMap(Arrays::stream)
.filter(BulkItemResponse::isFailed)
.map(BulkItemResponse::getFailure)
.map(Failure::getId)
.map(SortableIdentifier::new);
}
private static final Logger logger = LoggerFactory.getLogger(BatchIndexer.class);
private final ImportDataRequestEvent importDataRequest;
private final S3Driver s3Driver;
private final IndexingClient openSearchRestClient;
private final int numberOfFilesPerEvent;
private IndexingResultRecord<SortableIdentifier> processingResult;

public BatchIndexer(
ImportDataRequestEvent importDataRequestEvent,
S3Client s3Client,
IndexingClient openSearchRestClient,
int numberOfFilesPerEvent) {
this.importDataRequest = importDataRequestEvent;
this.openSearchRestClient = openSearchRestClient;
this.s3Driver = new S3Driver(s3Client, importDataRequestEvent.getBucket());
this.numberOfFilesPerEvent = numberOfFilesPerEvent;
}

public IndexingResult<SortableIdentifier> processRequest() {

ListingResult listFilesResult = fetchNextPageOfFilenames();
List<IndexDocument> contents =
fileContents(listFilesResult.getFiles()).collect(Collectors.toList());
List<SortableIdentifier> failedResults = indexFileContents(contents);
this.processingResult =
new IndexingResultRecord<>(
failedResults,
listFilesResult.getListingStartingPoint(),
listFilesResult.isTruncated());

return this;
}

private Stream<IndexDocument> fileContents(List<UnixPath> files) {
return files.stream().map(s3Driver::getFile).map(IndexDocument::fromJsonString);
}

@Override
public List<SortableIdentifier> failedResults() {
return this.processingResult.failedResults();
}

@Override
public String nextStartMarker() {
return processingResult.nextStartMarker();
}

@Override
public boolean truncated() {
return this.processingResult.truncated();
}

private ListingResult fetchNextPageOfFilenames() {
return s3Driver.listFiles(
UnixPath.of(importDataRequest.getS3Path()),
importDataRequest.getStartMarker(),
numberOfFilesPerEvent);
}

private List<SortableIdentifier> indexFileContents(List<IndexDocument> contents) {

Stream<BulkResponse> result = openSearchRestClient.batchInsert(contents.stream());
List<SortableIdentifier> failures = collectFailures(result).collect(Collectors.toList());
failures.forEach(this::logFailure);
return failures;
}

private <T> void logFailure(T failureMessage) {
logger.warn("Failed to index resource:{}", failureMessage);
}

private Stream<SortableIdentifier> collectFailures(Stream<BulkResponse> indexActions) {
return indexActions
.filter(BulkResponse::hasFailures)
.map(BulkResponse::getItems)
.flatMap(Arrays::stream)
.filter(BulkItemResponse::isFailed)
.map(BulkItemResponse::getFailure)
.map(Failure::getId)
.map(SortableIdentifier::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import nva.commons.core.JacocoGenerated;

import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.eventbridge.EventBridgeClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,31 @@

public final class EmitEventUtils {

private static final Logger logger = LoggerFactory.getLogger(EmitEventUtils.class);
private static final Logger logger = LoggerFactory.getLogger(EmitEventUtils.class);

private EmitEventUtils() {}
private EmitEventUtils() {}

public static void emitEvent(
EventBridgeClient eventBridgeClient,
ImportDataRequestEvent importDataRequest,
Context context) {
var putEventRequestEntry = eventEntry(importDataRequest, context);
logger.debug("BusName:" + BATCH_INDEX_EVENT_BUS_NAME);
logger.debug("Event:" + putEventRequestEntry.toString());
var putEventRequest = PutEventsRequest.builder().entries(putEventRequestEntry).build();
var response = eventBridgeClient.putEvents(putEventRequest);
logger.debug(response.toString());
}
public static void emitEvent(
EventBridgeClient eventBridgeClient,
ImportDataRequestEvent importDataRequest,
Context context) {
var putEventRequestEntry = eventEntry(importDataRequest, context);
logger.debug("BusName:" + BATCH_INDEX_EVENT_BUS_NAME);
logger.debug("Event:" + putEventRequestEntry.toString());
var putEventRequest = PutEventsRequest.builder().entries(putEventRequestEntry).build();
var response = eventBridgeClient.putEvents(putEventRequest);
logger.debug(response.toString());
}

private static PutEventsRequestEntry eventEntry(
ImportDataRequestEvent importDataRequest, Context context) {
return PutEventsRequestEntry.builder()
.eventBusName(BATCH_INDEX_EVENT_BUS_NAME)
.detailType(MANDATORY_UNUSED_SUBTOPIC)
.source(EventBasedBatchIndexer.class.getName())
.time(Instant.now())
.detail(importDataRequest.toJsonString())
.resources(context.getInvokedFunctionArn())
.build();
}
private static PutEventsRequestEntry eventEntry(
ImportDataRequestEvent importDataRequest, Context context) {
return PutEventsRequestEntry.builder()
.eventBusName(BATCH_INDEX_EVENT_BUS_NAME)
.detailType(MANDATORY_UNUSED_SUBTOPIC)
.source(EventBasedBatchIndexer.class.getName())
.time(Instant.now())
.detail(importDataRequest.toJsonString())
.resources(context.getInvokedFunctionArn())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,65 +21,62 @@
import software.amazon.awssdk.services.s3.S3Client;

public class EventBasedBatchIndexer
extends EventHandler<ImportDataRequestEvent, SortableIdentifier[]> {
extends EventHandler<ImportDataRequestEvent, SortableIdentifier[]> {

private static final Logger logger = LoggerFactory.getLogger(EventBasedBatchIndexer.class);
private static final Logger logger = LoggerFactory.getLogger(EventBasedBatchIndexer.class);

private final S3Client s3Client;
private final IndexingClient openSearchClient;
private final EventBridgeClient eventBridgeClient;
private final int numberOfFilesPerEvent;
private final S3Client s3Client;
private final IndexingClient openSearchClient;
private final EventBridgeClient eventBridgeClient;
private final int numberOfFilesPerEvent;

@JacocoGenerated
public EventBasedBatchIndexer() {
this(
defaultS3Client(),
defaultEsClient(),
defaultEventBridgeClient(),
NUMBER_OF_FILES_PER_EVENT_ENVIRONMENT_VARIABLE);
}
@JacocoGenerated
public EventBasedBatchIndexer() {
this(
defaultS3Client(),
defaultEsClient(),
defaultEventBridgeClient(),
NUMBER_OF_FILES_PER_EVENT_ENVIRONMENT_VARIABLE);
}

protected EventBasedBatchIndexer(
S3Client s3Client,
IndexingClient openSearchClient,
EventBridgeClient eventBridgeClient,
int numberOfFilesPerEvent) {
super(ImportDataRequestEvent.class);
this.s3Client = s3Client;
this.openSearchClient = openSearchClient;
this.eventBridgeClient = eventBridgeClient;
this.numberOfFilesPerEvent = numberOfFilesPerEvent;
}
protected EventBasedBatchIndexer(
S3Client s3Client,
IndexingClient openSearchClient,
EventBridgeClient eventBridgeClient,
int numberOfFilesPerEvent) {
super(ImportDataRequestEvent.class);
this.s3Client = s3Client;
this.openSearchClient = openSearchClient;
this.eventBridgeClient = eventBridgeClient;
this.numberOfFilesPerEvent = numberOfFilesPerEvent;
}

@Override
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) {
String inputString = IoUtils.streamToString(inputStream);
logger.debug(inputString);
super.handleRequest(IoUtils.stringToStream(inputString), outputStream, context);
}
@Override
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) {
String inputString = IoUtils.streamToString(inputStream);
logger.debug(inputString);
super.handleRequest(IoUtils.stringToStream(inputString), outputStream, context);
}

@Override
protected SortableIdentifier[] processInput(
ImportDataRequestEvent input,
AwsEventBridgeEvent<ImportDataRequestEvent> event,
Context context) {
logger.debug("Indexing folder:" + input.getS3Location());
logger.debug("Indexing lastEvaluatedKey:" + input.getStartMarker());
IndexingResult<SortableIdentifier> result =
new BatchIndexer(input, s3Client, openSearchClient, numberOfFilesPerEvent)
.processRequest();
if (result.truncated() && RECURSION_ENABLED) {
emitEventToProcessNextBatch(input, context, result);
}
return result.failedResults().toArray(SortableIdentifier[]::new);
@Override
protected SortableIdentifier[] processInput(
ImportDataRequestEvent input,
AwsEventBridgeEvent<ImportDataRequestEvent> event,
Context context) {
logger.debug("Indexing folder:" + input.getS3Location());
logger.debug("Indexing lastEvaluatedKey:" + input.getStartMarker());
IndexingResult<SortableIdentifier> result =
new BatchIndexer(input, s3Client, openSearchClient, numberOfFilesPerEvent).processRequest();
if (result.truncated() && RECURSION_ENABLED) {
emitEventToProcessNextBatch(input, context, result);
}
return result.failedResults().toArray(SortableIdentifier[]::new);
}

private void emitEventToProcessNextBatch(
ImportDataRequestEvent input,
Context context,
IndexingResult<SortableIdentifier> result) {
ImportDataRequestEvent newImportDataRequest =
new ImportDataRequestEvent(input.getS3Location(), result.nextStartMarker());
emitEvent(eventBridgeClient, newImportDataRequest, context);
}
private void emitEventToProcessNextBatch(
ImportDataRequestEvent input, Context context, IndexingResult<SortableIdentifier> result) {
ImportDataRequestEvent newImportDataRequest =
new ImportDataRequestEvent(input.getS3Location(), result.nextStartMarker());
emitEvent(eventBridgeClient, newImportDataRequest, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;

import no.unit.nva.commons.json.JsonSerializable;
import no.unit.nva.events.models.EventBody;

import nva.commons.core.JacocoGenerated;

import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import no.unit.nva.commons.json.JsonSerializable;
import no.unit.nva.events.models.EventBody;
import nva.commons.core.JacocoGenerated;

public class ImportDataRequestEvent implements EventBody, JsonSerializable {

Expand Down
Loading
Loading