diff --git a/archiver/src/main/java/com/timgroup/eventstore/archiver/BatchingUploadHandler.java b/archiver/src/main/java/com/timgroup/eventstore/archiver/BatchingUploadHandler.java index 783c505f..05c53f6f 100644 --- a/archiver/src/main/java/com/timgroup/eventstore/archiver/BatchingUploadHandler.java +++ b/archiver/src/main/java/com/timgroup/eventstore/archiver/BatchingUploadHandler.java @@ -6,11 +6,12 @@ import com.timgroup.eventstore.archiver.monitoring.ComponentUtils; import com.timgroup.eventsubscription.Event; import com.timgroup.eventsubscription.EventHandler; -import com.timgroup.remotefilestorage.s3.S3UploadableStorageForInputStream; +import com.timgroup.remotefilestorage.s3.UploadStorage; import com.timgroup.tucker.info.Component; import com.timgroup.tucker.info.component.SimpleValueComponent; import javax.annotation.Nonnull; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.time.Clock; import java.util.Arrays; @@ -19,25 +20,28 @@ import java.util.Map; import static com.timgroup.tucker.info.Status.INFO; +import static com.timgroup.tucker.info.Status.WARNING; import static java.lang.String.format; final class BatchingUploadHandler implements EventHandler { - private final S3UploadableStorageForInputStream output; + private final UploadStorage output; private final Clock clock; private final CurrentBatchWriter currentBatchWriter; private final Map appMetadata; private final Timer s3UploadTimer; private final SimpleValueComponent eventsAwaitingUploadComponent; + private final long uploadRetryDelay; private final SimpleValueComponent lastUploadState; BatchingUploadHandler( - S3UploadableStorageForInputStream uploadableStorage, + UploadStorage uploadableStorage, CurrentBatchWriter currentBatchWriter, Clock clock, Map appMetadata, String monitoringPrefix, - Timer s3UploadTimer) + Timer s3UploadTimer, + long uploadRetryDelay) { this.output = uploadableStorage; this.clock = clock; @@ -46,6 +50,7 @@ final class BatchingUploadHandler implements EventHandler { this.appMetadata = appMetadata; this.s3UploadTimer = s3UploadTimer; this.eventsAwaitingUploadComponent = new SimpleValueComponent(monitoringPrefix + "-events-awaiting-upload", "Number of events awaiting upload to archive"); + this.uploadRetryDelay = uploadRetryDelay; this.eventsAwaitingUploadComponent.updateValue(INFO, 0); this.lastUploadState = new SimpleValueComponent(monitoringPrefix + "-last-upload-state", "Last upload to S3 Archive"); this.lastUploadState.updateValue(INFO, "Nothing uploaded yet"); @@ -61,12 +66,7 @@ public void apply(@Nonnull Position position, @Nonnull Event deserializedEvent) try { S3BatchObject s3BatchObject = currentBatchWriter.prepareBatchForUpload(); - try (Timer.Context ignored = s3UploadTimer.time()) { - Map allMetadata = new HashMap<>(appMetadata); - allMetadata.putAll(s3BatchObject.metadata); - output.upload(key, s3BatchObject.content, s3BatchObject.contentLength, allMetadata); - } - lastUploadState.updateValue(INFO, format("Successfully uploaded object=[%s] at [%s]", key, clock.instant())); + upload(s3BatchObject, key); currentBatchWriter.reset(); } catch (IOException e) { @@ -82,6 +82,29 @@ public void apply(@Nonnull Position position, @Nonnull Event deserializedEvent) } } + private void upload(S3BatchObject s3BatchObject, String key) { + while (true) { + try (Timer.Context ignored = s3UploadTimer.time()) { + Map allMetadata = new HashMap<>(appMetadata); + allMetadata.putAll(s3BatchObject.metadata); + output.upload(key, + new ByteArrayInputStream(s3BatchObject.content), + s3BatchObject.contentLength, + allMetadata); + break; + } catch (Exception e) { + lastUploadState.updateValue(WARNING, format("Failed to upload object=[%s] at [%s]%n%s. Retrying.", key, + clock.instant(), ComponentUtils.getStackTraceAsString(e))); + + try { + Thread.sleep(uploadRetryDelay); + } catch (InterruptedException ex) {} + } + } + + lastUploadState.updateValue(INFO, format("Successfully uploaded object=[%s] at [%s]", key, clock.instant())); + } + @SuppressWarnings("WeakerAccess") public Collection monitoring() { return Arrays.asList(eventsAwaitingUploadComponent, lastUploadState); diff --git a/archiver/src/main/java/com/timgroup/eventstore/archiver/CurrentBatchWriter.java b/archiver/src/main/java/com/timgroup/eventstore/archiver/CurrentBatchWriter.java index de4315cb..62efd8a1 100644 --- a/archiver/src/main/java/com/timgroup/eventstore/archiver/CurrentBatchWriter.java +++ b/archiver/src/main/java/com/timgroup/eventstore/archiver/CurrentBatchWriter.java @@ -110,7 +110,7 @@ public S3BatchObject prepareBatchForUpload() throws IOException { compressedSizeMetrics.update(compressedContentSize); return new S3BatchObject( - new ByteArrayInputStream(content), + content, compressedContentSize, buildObjectMetadata(uncompressedContentSize, compressedContentSize, lastEventInBatch)); } diff --git a/archiver/src/main/java/com/timgroup/eventstore/archiver/S3Archiver.java b/archiver/src/main/java/com/timgroup/eventstore/archiver/S3Archiver.java index fac26cf0..fd9d50b4 100644 --- a/archiver/src/main/java/com/timgroup/eventstore/archiver/S3Archiver.java +++ b/archiver/src/main/java/com/timgroup/eventstore/archiver/S3Archiver.java @@ -92,7 +92,8 @@ private S3Archiver(EventSource liveEventSource, compressedSizeMetrics); Timer s3UploadTimer = metricRegistry.timer(this.monitoringPrefix + ".archive.upload"); - this.batchingUploadHandler = new BatchingUploadHandler(output, currentBatchWriter, clock, appMetadata, monitoringPrefix, s3UploadTimer); + this.batchingUploadHandler = new BatchingUploadHandler(output, currentBatchWriter, clock, appMetadata, + monitoringPrefix, s3UploadTimer, 30000); this.eventSubscription = subscriptionBuilder .readingFrom(liveEventSource.readAll(), convertPosition(maxPositionInArchiveOnStartup)) @@ -185,7 +186,7 @@ public static final class EventRecordHolder implements Event { @SuppressWarnings("WeakerAccess") public final EventRecord record; - private EventRecordHolder(EventRecord record) { + public EventRecordHolder(EventRecord record) { this.record = record; } } diff --git a/archiver/src/main/java/com/timgroup/eventstore/archiver/S3ArchiverFactory.java b/archiver/src/main/java/com/timgroup/eventstore/archiver/S3ArchiverFactory.java index 10b43268..90557f45 100644 --- a/archiver/src/main/java/com/timgroup/eventstore/archiver/S3ArchiverFactory.java +++ b/archiver/src/main/java/com/timgroup/eventstore/archiver/S3ArchiverFactory.java @@ -8,7 +8,6 @@ import com.timgroup.remotefilestorage.s3.S3ClientFactory; import com.timgroup.remotefilestorage.s3.S3ListableStorage; import com.timgroup.remotefilestorage.s3.S3StreamingDownloadableStorage; -import com.timgroup.remotefilestorage.s3.S3UploadableStorage; import com.timgroup.remotefilestorage.s3.S3UploadableStorageForInputStream; import com.timgroup.tucker.info.Component; @@ -86,6 +85,6 @@ private S3ListableStorage createS3ListableStorage(AmazonS3 amazonS3) { } private S3UploadableStorageForInputStream createUploadableStorage() { - return new S3UploadableStorageForInputStream(new S3UploadableStorage(amazonS3, bucketName), amazonS3, bucketName); + return new S3UploadableStorageForInputStream(amazonS3, bucketName); } } diff --git a/archiver/src/main/java/com/timgroup/eventstore/archiver/S3BatchObject.java b/archiver/src/main/java/com/timgroup/eventstore/archiver/S3BatchObject.java index aa9aad28..2cc16a8e 100644 --- a/archiver/src/main/java/com/timgroup/eventstore/archiver/S3BatchObject.java +++ b/archiver/src/main/java/com/timgroup/eventstore/archiver/S3BatchObject.java @@ -4,11 +4,11 @@ import java.util.Map; public final class S3BatchObject { - public final InputStream content; + public final byte[] content; public final int contentLength; public final Map metadata; - public S3BatchObject(InputStream content, int contentLength, Map metadata) { + public S3BatchObject(byte[] content, int contentLength, Map metadata) { this.content = content; this.contentLength = contentLength; this.metadata = metadata; diff --git a/archiver/src/main/java/com/timgroup/remotefilestorage/s3/S3UploadableStorageForInputStream.java b/archiver/src/main/java/com/timgroup/remotefilestorage/s3/S3UploadableStorageForInputStream.java index a31f8ab6..26863013 100644 --- a/archiver/src/main/java/com/timgroup/remotefilestorage/s3/S3UploadableStorageForInputStream.java +++ b/archiver/src/main/java/com/timgroup/remotefilestorage/s3/S3UploadableStorageForInputStream.java @@ -4,28 +4,27 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; -import com.timgroup.remotefilestorage.api.UploadableStorage; import org.slf4j.Logger; import java.io.InputStream; import java.net.URI; -import java.nio.file.Path; import java.util.Map; import static org.slf4j.LoggerFactory.getLogger; -public class S3UploadableStorageForInputStream implements UploadableStorage { +public class S3UploadableStorageForInputStream implements UploadStorage { private static final Logger LOG = getLogger(S3UploadableStorage.class); - private final S3UploadableStorage delegateFromLibrary; + private final AmazonS3 client; private final String bucketName; - public S3UploadableStorageForInputStream(S3UploadableStorage delegateFromLibrary, AmazonS3 client, String bucketName) { - this.delegateFromLibrary = delegateFromLibrary; + public S3UploadableStorageForInputStream(AmazonS3 client, String bucketName) { + this.client = client; this.bucketName = bucketName; } + @Override public URI upload(String name, InputStream content, long contentLength, Map metaData) { ObjectMetadata objectMetadata = new ObjectMetadata(); objectMetadata.setUserMetadata(metaData); @@ -37,9 +36,4 @@ public URI upload(String name, InputStream content, long contentLength, Map metaData) { - return this.delegateFromLibrary.upload(name, path, metaData); - } } diff --git a/archiver/src/main/java/com/timgroup/remotefilestorage/s3/UploadStorage.java b/archiver/src/main/java/com/timgroup/remotefilestorage/s3/UploadStorage.java new file mode 100644 index 00000000..674ca8be --- /dev/null +++ b/archiver/src/main/java/com/timgroup/remotefilestorage/s3/UploadStorage.java @@ -0,0 +1,9 @@ +package com.timgroup.remotefilestorage.s3; + +import java.io.InputStream; +import java.net.URI; +import java.util.Map; + +public interface UploadStorage { + URI upload(String name, InputStream content, long contentLength, Map metaData); +} diff --git a/archiver/src/test/java/com/timgroup/eventstore/archiver/BatchingUploadHandlerTest.java b/archiver/src/test/java/com/timgroup/eventstore/archiver/BatchingUploadHandlerTest.java new file mode 100644 index 00000000..0b58b892 --- /dev/null +++ b/archiver/src/test/java/com/timgroup/eventstore/archiver/BatchingUploadHandlerTest.java @@ -0,0 +1,94 @@ +package com.timgroup.eventstore.archiver; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.timgroup.eventstore.api.EventRecord; +import com.timgroup.eventstore.api.NewEvent; +import com.timgroup.eventstore.api.ResolvedEvent; +import com.timgroup.eventstore.api.StreamId; +import com.timgroup.eventstore.memory.InMemoryEventSource; +import com.timgroup.remotefilestorage.s3.UploadStorage; +import junit.framework.TestCase; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.time.Clock; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class BatchingUploadHandlerTest { + private final InMemoryEventSource store = new InMemoryEventSource(); + private final TestStorage uploadableStorage = new TestStorage(); + private final BatchingUploadHandler handler = new BatchingUploadHandler(uploadableStorage, + new CurrentBatchWriter(new FixedNumberOfEventsBatchingPolicy(1), + rs -> Long.parseLong(store.readAll().storePositionCodec().serializePosition(rs.position())), + new S3ArchiveKeyFormat("test"), + new MetricRegistry().histogram("test"), + new MetricRegistry().histogram("test")), + Clock.systemUTC(), + new HashMap<>(), + "", + new MetricRegistry().timer("timer"), + 1 + ); + + @Test + public void uploads_when_ready() { + store.writeStream().write(StreamId.streamId("cat", "id"), Arrays.asList( + NewEvent.newEvent("test", "hello".getBytes(StandardCharsets.UTF_8)) + )); + ResolvedEvent evt = store.readAll().readAllForwards().findFirst().get(); + + handler.apply(evt.position(), new S3Archiver.EventRecordHolder(evt.eventRecord())); + + assertThat(uploadableStorage.count, is(1)); + } + + @Test + public void retries_upload_on_failure() { + uploadableStorage.failNumberOfTimes(5); + + store.writeStream().write(StreamId.streamId("cat", "id"), Arrays.asList( + NewEvent.newEvent("test", "hello".getBytes(StandardCharsets.UTF_8)) + )); + ResolvedEvent evt = store.readAll().readAllForwards().findFirst().get(); + + handler.apply(evt.position(), new S3Archiver.EventRecordHolder(evt.eventRecord())); + + assertThat(uploadableStorage.count, is(1)); + } + + public static class TestStorage implements UploadStorage { + int count = 0; + private int failCount = 0; + + @Override + public URI upload(String name, InputStream content, long contentLength, Map metaData) { + if (failCount == 0) { + count++; + try { + return new URI("s3://nothing"); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + failCount--; + throw new RuntimeException("Configured failure"); + } + + public void failNumberOfTimes(int failCount) { + this.failCount = failCount; + } + } + +} \ No newline at end of file diff --git a/archiver/src/test/java/com/timgroup/eventstore/archiver/S3ArchiveEventSourceIntegrationTest.java b/archiver/src/test/java/com/timgroup/eventstore/archiver/S3ArchiveEventSourceIntegrationTest.java index 98d95535..14b9006a 100644 --- a/archiver/src/test/java/com/timgroup/eventstore/archiver/S3ArchiveEventSourceIntegrationTest.java +++ b/archiver/src/test/java/com/timgroup/eventstore/archiver/S3ArchiveEventSourceIntegrationTest.java @@ -15,7 +15,6 @@ import com.timgroup.remotefilestorage.s3.S3ClientFactory; import com.timgroup.remotefilestorage.s3.S3ListableStorage; import com.timgroup.remotefilestorage.s3.S3StreamingDownloadableStorage; -import com.timgroup.remotefilestorage.s3.S3UploadableStorage; import com.timgroup.remotefilestorage.s3.S3UploadableStorageForInputStream; import com.timgroup.tucker.info.Component; import com.timgroup.tucker.info.Report; @@ -322,7 +321,7 @@ twoEventsPerBatch, new S3ArchiveMaxPositionFetcher(listableStorage, new S3Archiv } private S3UploadableStorageForInputStream createUploadableStorage() { - return new S3UploadableStorageForInputStream(new S3UploadableStorage(amazonS3, bucketName), amazonS3, bucketName); + return new S3UploadableStorageForInputStream(amazonS3, bucketName); } } diff --git a/archiver/src/test/java/com/timgroup/eventstore/archiver/S3ArchiverIntegrationTest.java b/archiver/src/test/java/com/timgroup/eventstore/archiver/S3ArchiverIntegrationTest.java index 319f0f48..537d0f6a 100644 --- a/archiver/src/test/java/com/timgroup/eventstore/archiver/S3ArchiverIntegrationTest.java +++ b/archiver/src/test/java/com/timgroup/eventstore/archiver/S3ArchiverIntegrationTest.java @@ -20,7 +20,6 @@ import com.timgroup.remotefilestorage.s3.S3ClientFactory; import com.timgroup.remotefilestorage.s3.S3ListableStorage; import com.timgroup.remotefilestorage.s3.S3StreamingDownloadableStorage; -import com.timgroup.remotefilestorage.s3.S3UploadableStorage; import com.timgroup.remotefilestorage.s3.S3UploadableStorageForInputStream; import com.timgroup.tucker.info.Component; import com.timgroup.tucker.info.Status; @@ -471,6 +470,6 @@ twoEventsPerBatch, new S3ArchiveMaxPositionFetcher(listableStorage, new S3Archiv } private S3UploadableStorageForInputStream createUploadableStorage() { - return new S3UploadableStorageForInputStream(new S3UploadableStorage(amazonS3, bucketName), amazonS3, bucketName); + return new S3UploadableStorageForInputStream(amazonS3, bucketName); } }