Skip to content

Commit

Permalink
Justin/Mehul - retrying when there are transient failures
Browse files Browse the repository at this point in the history
  • Loading branch information
mehul-ion committed Aug 12, 2024
1 parent cf1f992 commit e0e706c
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import com.timgroup.eventstore.archiver.monitoring.ComponentUtils;
import com.timgroup.eventsubscription.Event;
import com.timgroup.eventsubscription.EventHandler;
import com.timgroup.remotefilestorage.s3.S3UploadableStorageForInputStream;
import com.timgroup.remotefilestorage.s3.UploadStorage;
import com.timgroup.tucker.info.Component;
import com.timgroup.tucker.info.component.SimpleValueComponent;

import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.time.Clock;
import java.util.Arrays;
Expand All @@ -19,25 +20,28 @@
import java.util.Map;

import static com.timgroup.tucker.info.Status.INFO;
import static com.timgroup.tucker.info.Status.WARNING;
import static java.lang.String.format;

final class BatchingUploadHandler implements EventHandler {
private final S3UploadableStorageForInputStream output;
private final UploadStorage output;
private final Clock clock;
private final CurrentBatchWriter currentBatchWriter;

private final Map<String, String> appMetadata;
private final Timer s3UploadTimer;
private final SimpleValueComponent eventsAwaitingUploadComponent;
private final long uploadRetryDelay;
private final SimpleValueComponent lastUploadState;

BatchingUploadHandler(
S3UploadableStorageForInputStream uploadableStorage,
UploadStorage uploadableStorage,
CurrentBatchWriter currentBatchWriter,
Clock clock,
Map<String, String> appMetadata,
String monitoringPrefix,
Timer s3UploadTimer)
Timer s3UploadTimer,
long uploadRetryDelay)
{
this.output = uploadableStorage;
this.clock = clock;
Expand All @@ -46,6 +50,7 @@ final class BatchingUploadHandler implements EventHandler {
this.appMetadata = appMetadata;
this.s3UploadTimer = s3UploadTimer;
this.eventsAwaitingUploadComponent = new SimpleValueComponent(monitoringPrefix + "-events-awaiting-upload", "Number of events awaiting upload to archive");
this.uploadRetryDelay = uploadRetryDelay;
this.eventsAwaitingUploadComponent.updateValue(INFO, 0);
this.lastUploadState = new SimpleValueComponent(monitoringPrefix + "-last-upload-state", "Last upload to S3 Archive");
this.lastUploadState.updateValue(INFO, "Nothing uploaded yet");
Expand All @@ -61,12 +66,7 @@ public void apply(@Nonnull Position position, @Nonnull Event deserializedEvent)
try {
S3BatchObject s3BatchObject = currentBatchWriter.prepareBatchForUpload();

try (Timer.Context ignored = s3UploadTimer.time()) {
Map<String, String> allMetadata = new HashMap<>(appMetadata);
allMetadata.putAll(s3BatchObject.metadata);
output.upload(key, s3BatchObject.content, s3BatchObject.contentLength, allMetadata);
}
lastUploadState.updateValue(INFO, format("Successfully uploaded object=[%s] at [%s]", key, clock.instant()));
upload(s3BatchObject, key);

currentBatchWriter.reset();
} catch (IOException e) {
Expand All @@ -82,6 +82,29 @@ public void apply(@Nonnull Position position, @Nonnull Event deserializedEvent)
}
}

private void upload(S3BatchObject s3BatchObject, String key) {
while (true) {
try (Timer.Context ignored = s3UploadTimer.time()) {
Map<String, String> allMetadata = new HashMap<>(appMetadata);
allMetadata.putAll(s3BatchObject.metadata);
output.upload(key,
new ByteArrayInputStream(s3BatchObject.content),
s3BatchObject.contentLength,
allMetadata);
break;
} catch (Exception e) {
lastUploadState.updateValue(WARNING, format("Failed to upload object=[%s] at [%s]%n%s. Retrying.", key,
clock.instant(), ComponentUtils.getStackTraceAsString(e)));

try {
Thread.sleep(uploadRetryDelay);
} catch (InterruptedException ex) {}
}
}

lastUploadState.updateValue(INFO, format("Successfully uploaded object=[%s] at [%s]", key, clock.instant()));
}

@SuppressWarnings("WeakerAccess")
public Collection<Component> monitoring() {
return Arrays.asList(eventsAwaitingUploadComponent, lastUploadState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public S3BatchObject prepareBatchForUpload() throws IOException {
compressedSizeMetrics.update(compressedContentSize);

return new S3BatchObject(
new ByteArrayInputStream(content),
content,
compressedContentSize,
buildObjectMetadata(uncompressedContentSize, compressedContentSize, lastEventInBatch));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ private S3Archiver(EventSource liveEventSource,
compressedSizeMetrics);
Timer s3UploadTimer = metricRegistry.timer(this.monitoringPrefix + ".archive.upload");

this.batchingUploadHandler = new BatchingUploadHandler(output, currentBatchWriter, clock, appMetadata, monitoringPrefix, s3UploadTimer);
this.batchingUploadHandler = new BatchingUploadHandler(output, currentBatchWriter, clock, appMetadata,
monitoringPrefix, s3UploadTimer, 30000);

this.eventSubscription = subscriptionBuilder
.readingFrom(liveEventSource.readAll(), convertPosition(maxPositionInArchiveOnStartup))
Expand Down Expand Up @@ -185,7 +186,7 @@ public static final class EventRecordHolder implements Event {
@SuppressWarnings("WeakerAccess")
public final EventRecord record;

private EventRecordHolder(EventRecord record) {
public EventRecordHolder(EventRecord record) {
this.record = record;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.timgroup.remotefilestorage.s3.S3ClientFactory;
import com.timgroup.remotefilestorage.s3.S3ListableStorage;
import com.timgroup.remotefilestorage.s3.S3StreamingDownloadableStorage;
import com.timgroup.remotefilestorage.s3.S3UploadableStorage;
import com.timgroup.remotefilestorage.s3.S3UploadableStorageForInputStream;
import com.timgroup.tucker.info.Component;

Expand Down Expand Up @@ -86,6 +85,6 @@ private S3ListableStorage createS3ListableStorage(AmazonS3 amazonS3) {
}

private S3UploadableStorageForInputStream createUploadableStorage() {
return new S3UploadableStorageForInputStream(new S3UploadableStorage(amazonS3, bucketName), amazonS3, bucketName);
return new S3UploadableStorageForInputStream(amazonS3, bucketName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import java.util.Map;

public final class S3BatchObject {
public final InputStream content;
public final byte[] content;
public final int contentLength;
public final Map<String, String> metadata;

public S3BatchObject(InputStream content, int contentLength, Map<String, String> metadata) {
public S3BatchObject(byte[] content, int contentLength, Map<String, String> metadata) {
this.content = content;
this.contentLength = contentLength;
this.metadata = metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,27 @@
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.timgroup.remotefilestorage.api.UploadableStorage;
import org.slf4j.Logger;

import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.util.Map;

import static org.slf4j.LoggerFactory.getLogger;

public class S3UploadableStorageForInputStream implements UploadableStorage {
public class S3UploadableStorageForInputStream implements UploadStorage {
private static final Logger LOG = getLogger(S3UploadableStorage.class);
private final S3UploadableStorage delegateFromLibrary;

private final AmazonS3 client;
private final String bucketName;

public S3UploadableStorageForInputStream(S3UploadableStorage delegateFromLibrary, AmazonS3 client, String bucketName) {
this.delegateFromLibrary = delegateFromLibrary;
public S3UploadableStorageForInputStream(AmazonS3 client, String bucketName) {

this.client = client;
this.bucketName = bucketName;
}

@Override
public URI upload(String name, InputStream content, long contentLength, Map<String, String> metaData) {
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setUserMetadata(metaData);
Expand All @@ -37,9 +36,4 @@ public URI upload(String name, InputStream content, long contentLength, Map<Stri
LOG.info("Response received, URI: " + uri);
return uri;
}


@Override public URI upload(String name, Path path, Map<String, String> metaData) {
return this.delegateFromLibrary.upload(name, path, metaData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.timgroup.remotefilestorage.s3;

import java.io.InputStream;
import java.net.URI;
import java.util.Map;

public interface UploadStorage {
URI upload(String name, InputStream content, long contentLength, Map<String, String> metaData);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.timgroup.eventstore.archiver;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.timgroup.eventstore.api.EventRecord;
import com.timgroup.eventstore.api.NewEvent;
import com.timgroup.eventstore.api.ResolvedEvent;
import com.timgroup.eventstore.api.StreamId;
import com.timgroup.eventstore.memory.InMemoryEventSource;
import com.timgroup.remotefilestorage.s3.UploadStorage;
import junit.framework.TestCase;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class BatchingUploadHandlerTest {
private final InMemoryEventSource store = new InMemoryEventSource();
private final TestStorage uploadableStorage = new TestStorage();
private final BatchingUploadHandler handler = new BatchingUploadHandler(uploadableStorage,
new CurrentBatchWriter(new FixedNumberOfEventsBatchingPolicy(1),
rs -> Long.parseLong(store.readAll().storePositionCodec().serializePosition(rs.position())),
new S3ArchiveKeyFormat("test"),
new MetricRegistry().histogram("test"),
new MetricRegistry().histogram("test")),
Clock.systemUTC(),
new HashMap<>(),
"",
new MetricRegistry().timer("timer"),
1
);

@Test
public void uploads_when_ready() {
store.writeStream().write(StreamId.streamId("cat", "id"), Arrays.asList(
NewEvent.newEvent("test", "hello".getBytes(StandardCharsets.UTF_8))
));
ResolvedEvent evt = store.readAll().readAllForwards().findFirst().get();

handler.apply(evt.position(), new S3Archiver.EventRecordHolder(evt.eventRecord()));

assertThat(uploadableStorage.count, is(1));
}

@Test
public void retries_upload_on_failure() {
uploadableStorage.failNumberOfTimes(5);

store.writeStream().write(StreamId.streamId("cat", "id"), Arrays.asList(
NewEvent.newEvent("test", "hello".getBytes(StandardCharsets.UTF_8))
));
ResolvedEvent evt = store.readAll().readAllForwards().findFirst().get();

handler.apply(evt.position(), new S3Archiver.EventRecordHolder(evt.eventRecord()));

assertThat(uploadableStorage.count, is(1));
}

public static class TestStorage implements UploadStorage {
int count = 0;
private int failCount = 0;

@Override
public URI upload(String name, InputStream content, long contentLength, Map<String, String> metaData) {
if (failCount == 0) {
count++;
try {
return new URI("s3://nothing");
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
failCount--;
throw new RuntimeException("Configured failure");
}

public void failNumberOfTimes(int failCount) {
this.failCount = failCount;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.timgroup.remotefilestorage.s3.S3ClientFactory;
import com.timgroup.remotefilestorage.s3.S3ListableStorage;
import com.timgroup.remotefilestorage.s3.S3StreamingDownloadableStorage;
import com.timgroup.remotefilestorage.s3.S3UploadableStorage;
import com.timgroup.remotefilestorage.s3.S3UploadableStorageForInputStream;
import com.timgroup.tucker.info.Component;
import com.timgroup.tucker.info.Report;
Expand Down Expand Up @@ -322,7 +321,7 @@ twoEventsPerBatch, new S3ArchiveMaxPositionFetcher(listableStorage, new S3Archiv
}

private S3UploadableStorageForInputStream createUploadableStorage() {
return new S3UploadableStorageForInputStream(new S3UploadableStorage(amazonS3, bucketName), amazonS3, bucketName);
return new S3UploadableStorageForInputStream(amazonS3, bucketName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.timgroup.remotefilestorage.s3.S3ClientFactory;
import com.timgroup.remotefilestorage.s3.S3ListableStorage;
import com.timgroup.remotefilestorage.s3.S3StreamingDownloadableStorage;
import com.timgroup.remotefilestorage.s3.S3UploadableStorage;
import com.timgroup.remotefilestorage.s3.S3UploadableStorageForInputStream;
import com.timgroup.tucker.info.Component;
import com.timgroup.tucker.info.Status;
Expand Down Expand Up @@ -471,6 +470,6 @@ twoEventsPerBatch, new S3ArchiveMaxPositionFetcher(listableStorage, new S3Archiv
}

private S3UploadableStorageForInputStream createUploadableStorage() {
return new S3UploadableStorageForInputStream(new S3UploadableStorage(amazonS3, bucketName), amazonS3, bucketName);
return new S3UploadableStorageForInputStream(amazonS3, bucketName);
}
}

0 comments on commit e0e706c

Please sign in to comment.