diff --git a/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/client/NeptuneDataClientFactory.java b/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/client/NeptuneDataClientFactory.java index 36774f408a..55b3c9be4b 100644 --- a/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/client/NeptuneDataClientFactory.java +++ b/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/client/NeptuneDataClientFactory.java @@ -7,8 +7,12 @@ import org.opensearch.dataprepper.plugins.source.neptune.configuration.AwsConfig; import org.opensearch.dataprepper.plugins.source.neptune.configuration.NeptuneSourceConfig; import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; +import software.amazon.awssdk.core.signer.NoOpSigner; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.neptunedata.NeptunedataClient; import software.amazon.awssdk.services.sts.StsClient; @@ -20,12 +24,28 @@ public class NeptuneDataClientFactory { public static NeptunedataClient provideNeptuneDataClient(final NeptuneSourceConfig sourceConfig) { - final AwsConfig awsConfig = sourceConfig.getAwsConfig(); - return NeptunedataClient - .builder() - .region(Region.of(sourceConfig.getRegion())) - .credentialsProvider(getAwsCredentials(Region.of(sourceConfig.getRegion()), awsConfig.getAwsStsRoleArn(), awsConfig.getAwsStsExternalId())) - .endpointOverride(URI.create(String.format("https://%s:%s", sourceConfig.getHost(), sourceConfig.getPort()))).build(); + final URI endpoint = URI.create(String.format("https://%s:%s", sourceConfig.getHost(), sourceConfig.getPort())); + if (sourceConfig.isIamAuth()) { + final AwsConfig awsConfig = sourceConfig.getAwsConfig(); + return NeptunedataClient.builder() + .endpointOverride(endpoint) + .region(Region.of(sourceConfig.getRegion())) + .credentialsProvider(getAwsCredentials(Region.of(sourceConfig.getRegion()), awsConfig.getAwsStsRoleArn(), awsConfig.getAwsStsExternalId())) + .build(); + } else { + final ClientOverrideConfiguration clientOverrideConfiguration = + // Do not sign the request + ClientOverrideConfiguration.builder() + .putAdvancedOption(SdkAdvancedClientOption.SIGNER, new NoOpSigner()) + .build(); + + return NeptunedataClient.builder() + .endpointOverride(endpoint) + .region(Region.of(sourceConfig.getRegion())) + .overrideConfiguration(clientOverrideConfiguration) + .credentialsProvider(AnonymousCredentialsProvider.create()) + .build(); + } } private static AwsCredentialsProvider getAwsCredentials( diff --git a/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/client/NeptuneStreamClient.java b/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/client/NeptuneStreamClient.java index 7fada08df5..622bf8f833 100644 --- a/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/client/NeptuneStreamClient.java +++ b/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/client/NeptuneStreamClient.java @@ -38,7 +38,6 @@ public NeptuneStreamClient(final NeptuneSourceConfig config, final int batchSize this.retryCount = 0; } - public void setStreamPosition(final long commitNum, final long opNum) { streamPositionInfo = new StreamPosition(commitNum, opNum); } diff --git a/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/configuration/NeptuneSourceConfig.java b/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/configuration/NeptuneSourceConfig.java index 215fb641da..bcad2f9043 100644 --- a/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/configuration/NeptuneSourceConfig.java +++ b/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/configuration/NeptuneSourceConfig.java @@ -23,7 +23,7 @@ public class NeptuneSourceConfig { @JsonProperty("region") private String region; @JsonProperty("iam_auth") - private boolean iamAuth = false; + private boolean iamAuth; @JsonProperty("trust_store_file_path") private String trustStoreFilePath; diff --git a/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/stream/StreamAcknowledgementManager.java b/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/stream/StreamAcknowledgementManager.java index 1b34bd7fd6..cddb57e2cb 100644 --- a/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/stream/StreamAcknowledgementManager.java +++ b/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/stream/StreamAcknowledgementManager.java @@ -142,7 +142,9 @@ Optional createAcknowledgementSet(final StreamCheckpoint che } void shutdown() { - monitoringTask.cancel(true); + if (monitoringTask != null) { + monitoringTask.cancel(true); + } try { if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { this.executorService.shutdownNow(); diff --git a/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/stream/StreamWorker.java b/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/stream/StreamWorker.java index 9f11443a2d..56220d410f 100644 --- a/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/stream/StreamWorker.java +++ b/data-prepper-plugins/neptune-source/src/main/java/org/opensearch/dataprepper/plugins/source/neptune/stream/StreamWorker.java @@ -350,7 +350,7 @@ public boolean onNeptuneStreamException(final Exception exception, final StreamP LOG.warn("Stream is corrupt, stopping the worker and resetting the stream."); this.isUnrecoverableError = true; } else { - LOG.info("Error fetching stream data, stopping processing"); + LOG.info("Error fetching stream data, stopping processing: {}", exception.getMessage()); } return false; }