forked from opensearch-project/data-prepper
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactoring and fix for e2e acks (#1)
* Refactoring and fix for e2e acks * fix: remove WIP code from StreamRecordValidator
- Loading branch information
Showing
20 changed files
with
725 additions
and
538 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
73 changes: 73 additions & 0 deletions
73
...in/java/org/opensearch/dataprepper/plugins/source/neptune/client/NeptuneStreamClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.source.neptune.client; | ||
|
||
import lombok.Getter; | ||
import org.opensearch.dataprepper.plugins.source.neptune.configuration.NeptuneSourceConfig; | ||
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.NeptuneStreamRecord; | ||
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.StreamPosition; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.services.neptunedata.model.StreamRecordsNotFoundException; | ||
|
||
import java.time.Duration; | ||
import java.util.List; | ||
|
||
public class NeptuneStreamClient { | ||
private static final Logger LOG = LoggerFactory.getLogger(NeptuneStreamClient.class); | ||
private static final long MAX_BACKOFF_TIME = 60; | ||
private final NeptuneDataClientWrapper dataClient; | ||
private final NeptuneStreamEventListener listener; | ||
|
||
@Getter | ||
private StreamPosition streamPositionInfo; | ||
private long retryCount; | ||
|
||
public NeptuneStreamClient(final NeptuneSourceConfig config, final int batchSize, final NeptuneStreamEventListener listener) { | ||
this.dataClient = new NeptuneDataClientWrapper(config, batchSize); | ||
this.listener = listener; | ||
this.streamPositionInfo = StreamPosition.empty(); | ||
this.retryCount = 0; | ||
} | ||
|
||
|
||
public void setStreamPosition(final long commitNum, final long opNum) { | ||
streamPositionInfo = new StreamPosition(commitNum, opNum); | ||
} | ||
|
||
public void start() throws InterruptedException { | ||
while (!Thread.currentThread().isInterrupted() && !listener.shouldStopNeptuneStream(streamPositionInfo)) { | ||
try { | ||
final List<NeptuneStreamRecord> streamRecords = | ||
this.dataClient.getStreamRecords(streamPositionInfo.getCommitNum(), streamPositionInfo.getOpNum()); | ||
retryCount = 0; | ||
if (!streamRecords.isEmpty()) { | ||
final NeptuneStreamRecord<?> lastRecord = streamRecords.get(streamRecords.size() - 1); | ||
setStreamPosition(lastRecord.getCommitNum(), lastRecord.getOpNum()); | ||
} | ||
listener.onNeptuneStreamEvents(streamRecords, streamPositionInfo); | ||
} catch (final StreamRecordsNotFoundException exception) { | ||
final long nextBackoff = getNextBackoff(); | ||
LOG.info("Stream is up-to-date, Sleeping for {} seconds before retrying again.", nextBackoff); | ||
Thread.sleep(Duration.ofSeconds(nextBackoff).toMillis()); | ||
} catch (final Exception exception) { | ||
if (!listener.onNeptuneStreamException(exception, streamPositionInfo)) { | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
|
||
private long getNextBackoff() { | ||
final long nextBackoff = (long) Math.pow(2.0f, retryCount++); | ||
return Math.min(MAX_BACKOFF_TIME, nextBackoff); | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
.../org/opensearch/dataprepper/plugins/source/neptune/client/NeptuneStreamEventListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.source.neptune.client; | ||
|
||
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.NeptuneStreamRecord; | ||
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.StreamPosition; | ||
|
||
import java.util.List; | ||
|
||
public interface NeptuneStreamEventListener { | ||
/** | ||
* @param records the records returned from the Stream. | ||
* @param streamPosition current commitNum and OpNum in the stream. | ||
*/ | ||
void onNeptuneStreamEvents(final List<NeptuneStreamRecord> records, final StreamPosition streamPosition); | ||
|
||
/** | ||
* | ||
* @param exception | ||
* @param streamPosition current commitNum and OpNum in the stream. | ||
* @return boolean if the execution should continue after that exception is encountered. | ||
*/ | ||
boolean onNeptuneStreamException(final Exception exception, final StreamPosition streamPosition); | ||
|
||
/** | ||
* | ||
* @param streamPosition | ||
* @return boolean if stream processing should be stopped | ||
*/ | ||
boolean shouldStopNeptuneStream(final StreamPosition streamPosition); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
131 changes: 131 additions & 0 deletions
131
...ain/java/org/opensearch/dataprepper/plugins/source/neptune/converter/NeptuneS3Record.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
/* | ||
* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.source.neptune.converter; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
import org.apache.commons.lang3.tuple.ImmutablePair; | ||
import org.eclipse.rdf4j.model.Literal; | ||
import org.eclipse.rdf4j.model.Statement; | ||
import org.eclipse.rdf4j.model.vocabulary.RDF; | ||
import org.opensearch.dataprepper.plugins.source.neptune.stream.StreamUtils; | ||
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.NeptuneStreamRecord; | ||
import software.amazon.awssdk.services.neptunedata.model.PropertygraphData; | ||
import software.amazon.awssdk.services.neptunedata.model.SparqlData; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* Represents a single record stored in S3. | ||
*/ | ||
@Builder | ||
@Getter | ||
public class NeptuneS3Record { | ||
|
||
private static final String DOCUMENT_TYPE_VERTEX = "vertex"; | ||
private static final String DOCUMENT_TYPE_EDGE = "edge"; | ||
private static final String DOCUMENT_TYPE_RDF = "rdf-resource"; | ||
private static final String VERTEX_ID_PREFIX = "v://"; | ||
private static final String EDGE_ID_PREFIX = "e://"; | ||
private static final String PG_ENTITY_ID_KEY = "label"; | ||
|
||
// Reference to Neptune entity corresponding to document. For Gremlin, it will be Vertex Id for Vertex document & | ||
// Edge ID for Edge Document. For Sparql, it will be RDF subject URI | ||
@JsonProperty("entity_id") | ||
private String entityId; | ||
|
||
// Store the Neptune entity type. Vertex/Edge label for gremlin. rdf:type for Sparql | ||
@JsonProperty("entity_type") | ||
private String entityType; | ||
|
||
// Classify Open Search document. It could be one of vertex / edge / rdf-resource | ||
@JsonProperty("document_type") | ||
private String documentType; | ||
|
||
// Nested Field for storing predicate corresponding to Graph vertex / Edge | ||
@JsonProperty("predicate") | ||
private ImmutablePair<String, NeptuneS3RecordPredicate> predicate; | ||
|
||
private static NeptuneS3Record convertPropertyGraphStreamRecord(final NeptuneStreamRecord record) { | ||
if (!(record.getData() instanceof PropertygraphData)) { | ||
throw new IllegalArgumentException("Data must be a PropertygraphData"); | ||
} | ||
|
||
String entityType = null; | ||
ImmutablePair<String, NeptuneS3RecordPredicate> predicate = null; | ||
final PropertygraphData propertygraphData = ((PropertygraphData) record.getData()); | ||
final String key = propertygraphData.key(); | ||
if (key.equalsIgnoreCase(PG_ENTITY_ID_KEY)) { | ||
entityType = propertygraphData.value().asMap().get("value").asString(); | ||
} else { | ||
predicate = ImmutablePair.of(key, NeptuneS3RecordPredicate.fromPropertGraphData(propertygraphData)); | ||
} | ||
|
||
return NeptuneS3Record | ||
.builder() | ||
.entityId(getEntityIdForPropertyGraph(propertygraphData.type(), propertygraphData.id())) | ||
.documentType(getDocumentTypeForPropertyGraph(propertygraphData.type())) | ||
.entityType(entityType) | ||
.predicate(predicate) | ||
.build(); | ||
} | ||
|
||
private static NeptuneS3Record convertSparqlStreamRecord(final NeptuneStreamRecord record) throws IOException { | ||
if (!(record.getData() instanceof SparqlData)) { | ||
throw new IllegalArgumentException("Data must be a SparqlData"); | ||
} | ||
final Statement stmt = StreamUtils.parseSparqlStatement(((SparqlData) record.getData()).stmt()); | ||
final boolean isTypeStmt = stmt.getPredicate().isIRI() && stmt.getPredicate().equals(RDF.TYPE); | ||
ImmutablePair<String, NeptuneS3RecordPredicate> predicate = null; | ||
String entityType = null; | ||
|
||
if (isTypeStmt) { | ||
entityType = stmt.getObject().stringValue(); | ||
} else { | ||
final String predicateName = stmt.getPredicate().stringValue(); | ||
final NeptuneS3RecordPredicate predicateValue = NeptuneS3RecordPredicate | ||
.builder() | ||
.value(stmt.getObject().stringValue()) | ||
.graph(stmt.getContext().stringValue()) | ||
.language(stmt.getObject() instanceof Literal ? ((Literal) stmt.getObject()).getLanguage().orElse(null) : null) | ||
.build(); | ||
predicate = ImmutablePair.of(predicateName, predicateValue); | ||
} | ||
return NeptuneS3Record | ||
.builder() | ||
.entityId(stmt.getSubject().stringValue()) | ||
.documentType(DOCUMENT_TYPE_RDF) | ||
.entityType(entityType) | ||
.predicate(predicate) | ||
.build(); | ||
|
||
} | ||
|
||
public static NeptuneS3Record fromNeptuneStreamRecord(final NeptuneStreamRecord neptuneStreamRecord) throws IOException { | ||
if (neptuneStreamRecord.getData() instanceof PropertygraphData) { | ||
return convertPropertyGraphStreamRecord(neptuneStreamRecord); | ||
} | ||
return convertSparqlStreamRecord(neptuneStreamRecord); | ||
} | ||
|
||
private static String getEntityIdForPropertyGraph(final String type, final String entityId) { | ||
return String.format("%s%s", type.startsWith("v") ? VERTEX_ID_PREFIX : EDGE_ID_PREFIX, entityId); | ||
} | ||
|
||
private static String getDocumentTypeForPropertyGraph(final String type) { | ||
if (type.equalsIgnoreCase("vl") || type.equalsIgnoreCase("vp")) { | ||
return DOCUMENT_TYPE_VERTEX; | ||
} | ||
return DOCUMENT_TYPE_EDGE; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
80 changes: 80 additions & 0 deletions
80
...opensearch/dataprepper/plugins/source/neptune/converter/NeptuneStreamRecordValidator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.source.neptune.converter; | ||
|
||
import org.eclipse.rdf4j.model.IRI; | ||
import org.eclipse.rdf4j.model.Literal; | ||
import org.eclipse.rdf4j.model.Statement; | ||
import org.eclipse.rdf4j.model.vocabulary.RDF; | ||
import org.eclipse.rdf4j.model.vocabulary.XSD; | ||
import org.opensearch.dataprepper.plugins.source.neptune.configuration.NeptuneSourceConfig; | ||
import org.opensearch.dataprepper.plugins.source.neptune.stream.StreamUtils; | ||
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.NeptuneStreamRecord; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.services.neptunedata.model.PropertygraphData; | ||
import software.amazon.awssdk.services.neptunedata.model.SparqlData; | ||
|
||
import java.io.IOException; | ||
|
||
|
||
/** | ||
* Validates if the record from Neptune Streams is a valid record. | ||
* (1) If {@link NeptuneSourceConfig#isEnableNonStringIndexing()} is true, then all datatypes are valid and mapped to | ||
* OS datatypes as defined in https://docs.aws.amazon.com/neptune/latest/userguide/full-text-search-non-string-indexing-mapping.html. | ||
* (2) Otherwise, only String datatypes are supported and any non-string record is dropped. | ||
*/ | ||
public class NeptuneStreamRecordValidator { | ||
private static final Logger LOG = LoggerFactory.getLogger(NeptuneStreamRecordValidator.class); | ||
private final boolean allowNonStringDatatypes; | ||
|
||
public NeptuneStreamRecordValidator(final boolean allowNonStringDatatypes) { | ||
this.allowNonStringDatatypes = allowNonStringDatatypes; | ||
} | ||
|
||
public boolean isValid(final NeptuneStreamRecord record) { | ||
if (record.getData() instanceof SparqlData) { | ||
return isValidSparqlRecord(record); | ||
} | ||
return isValidPropertyGraphRecord(record); | ||
} | ||
|
||
private boolean isValidPropertyGraphRecord(final NeptuneStreamRecord record) { | ||
if (allowNonStringDatatypes) { | ||
return true; | ||
} | ||
final PropertygraphData data = (PropertygraphData) record.getData(); | ||
final String datatype = data.value().asMap().get("dataType").asString(); | ||
return datatype.equalsIgnoreCase("String"); | ||
} | ||
|
||
private boolean isValidSparqlRecord(final NeptuneStreamRecord record) { | ||
if (allowNonStringDatatypes) { | ||
return true; | ||
} | ||
final SparqlData data = (SparqlData) record.getData(); | ||
try { | ||
final Statement statement = StreamUtils.parseSparqlStatement(data.stmt()); | ||
if (!statement.getObject().isLiteral()) { | ||
return false; | ||
} | ||
return isSparqlStringDatatype(((Literal) statement.getObject()).getDatatype()); | ||
} catch (IOException e) { | ||
LOG.error("Failed to parse Sparql statement, Skipping record: ", e); | ||
return false; | ||
} | ||
} | ||
|
||
private static boolean isSparqlStringDatatype(IRI datatype) { | ||
return XSD.STRING.equals(datatype) || RDF.LANGSTRING.equals(datatype); | ||
} | ||
} |
Oops, something went wrong.