Skip to content

Commit

Permalink
Size computation mode (#1093)
Browse files Browse the repository at this point in the history
* stricter idempotent import executor api

* size calculation mode

* fixed unused import

* Update VideoModel.java

Fix JSON override

Co-authored-by: William Morland <[email protected]>
  • Loading branch information
xokker and wmorland authored Jun 18, 2022
1 parent 1fdf324 commit 46aac50
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 7 deletions.
6 changes: 5 additions & 1 deletion portability-spi-cloud/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ dependencies {
compile("com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}")
compile("com.google.auto.value:auto-value:${autoValueVersion}")
compile("com.google.inject:guice:${guiceVersion}")

testCompile("junit:junit:${junitVersion}")
testCompile("org.mockito:mockito-core:${mockitoVersion}")
testCompile("com.google.truth:truth:${truthVersion}")
}

sourceSets {
Expand All @@ -43,4 +47,4 @@ sourceSets {
}
}

configurePublication(project)
configurePublication(project)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.datatransferproject.spi.cloud.connection;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.UUID;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.types.common.DownloadableItem;

public class ConnectionProvider {

private final TemporaryPerJobDataStore jobStore;

public ConnectionProvider(TemporaryPerJobDataStore jobStore) {
this.jobStore = jobStore;
}

public InputStreamWrapper getInputStreamForItem(UUID jobId, DownloadableItem item)
throws IOException {

String fetchableUrl = item.getFetchableUrl();
if (item.isInTempStore()) {
return jobStore.getStream(jobId, fetchableUrl);
}

HttpURLConnection conn = getConnection(fetchableUrl);
return new InputStreamWrapper(
conn.getInputStream(), Math.max(conn.getContentLengthLong(), 0));
}

public static HttpURLConnection getConnection(String urlStr) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
return conn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ default Map<String, Integer> getCounts(UUID jobId) {
*/
default void addBytes(UUID jobId, Long bytes) throws IOException {}

/**
* Increments the bytes count for downloadable items of the given job.
*
* @param bytes key is idempotent id of a DownloadableItem
*/
default void addBytes(UUID jobId, Map<String, Long> bytes) {}

/** Provides the total number of bytes transferred. */
default Long getBytes(UUID jobId) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public abstract class PortabilityJob {
private static final String EXPORT_ENCRYPTED_INITIAL_AUTH_DATA =
"EXPORT_ENCRYPTED_INITIAL_AUTH_DATA";
private static final String JOB_STATE = "JOB_STATE";
private static final String TRANSFER_MODE = "TRANSFER_MODE";
private static final String FAILURE_REASON = "FAILURE_REASON";
private static final String NUMBER_OF_FAILED_FILES_KEY = "NUM_FAILED_FILES";
private static final String USER_TIMEZONE = "USER_TIMEZONE";
Expand Down Expand Up @@ -94,6 +95,11 @@ public static PortabilityJob fromMap(Map<String, Object> properties) {
String userLocale =
properties.containsKey(USER_LOCALE) ? (String) properties.get(USER_LOCALE) : null;

TransferMode transferMode =
properties.containsKey(TRANSFER_MODE)
? TransferMode.valueOf((String) properties.get(TRANSFER_MODE))
: TransferMode.DATA_TRANSFER;

return PortabilityJob.builder()
.setState(state)
.setExportService((String) properties.get(EXPORT_SERVICE_KEY))
Expand All @@ -117,6 +123,7 @@ public static PortabilityJob fromMap(Map<String, Object> properties) {
.build())
.setUserTimeZone(userTimeZone)
.setUserLocale(userLocale)
.setTransferMode(transferMode)
.build();
}

Expand Down Expand Up @@ -171,6 +178,10 @@ private static void isSet(String... strings) {
@JsonProperty("userLocale")
public abstract String userLocale();

@Nullable
@JsonProperty("transferMode")
public abstract TransferMode transferMode();

public abstract PortabilityJob.Builder toBuilder();

public Map<String, Object> toMap() {
Expand Down Expand Up @@ -229,6 +240,10 @@ public Map<String, Object> toMap() {
builder.put(USER_LOCALE, userLocale());
}

if (null != transferMode()) {
builder.put(TRANSFER_MODE, transferMode().toString());
}

return builder.build();
}

Expand All @@ -242,6 +257,19 @@ public enum State {
PREEMPTED
}

public enum TransferMode {
/**
* Regular data transfer mode: export data from a service, then import into another service.
*/
DATA_TRANSFER,

/**
* Do not import the data. Instead, compute the size of every exported item and report the sizes
* to the job store.
*/
SIZE_CALCULATION
}

@AutoValue.Builder
public abstract static class Builder {
@JsonCreator
Expand Down Expand Up @@ -306,6 +334,10 @@ public Builder setAndValidateJobAuthorization(JobAuthorization jobAuthorization)
@JsonProperty("userLocale")
public abstract Builder setUserLocale(String locale);

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("transferMode")
public abstract Builder setTransferMode(TransferMode transferMode);

// For internal use only; clients should use setAndValidateJobAuthorization
protected abstract Builder setJobAuthorization(JobAuthorization jobAuthorization);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.datatransferproject.spi.cloud.connection;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.truth.Truth;
import java.util.UUID;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.types.common.DownloadableItem;
import org.datatransferproject.types.common.models.photos.PhotoModel;
import org.junit.Before;
import org.junit.Test;

public class ConnectionProviderTest {

private TemporaryPerJobDataStore jobStore;
private ConnectionProvider connectionProvider;

@Before
public void setUp() throws Exception {
jobStore = mock(TemporaryPerJobDataStore.class);
connectionProvider = new ConnectionProvider(jobStore);
}

@Test
public void getInputStreamFromTempStore() throws Exception {
long expectedBytes = 323;
when(jobStore.getStream(any(), anyString())).thenReturn(
new InputStreamWrapper(null, expectedBytes));
boolean inTempStore = true;
String fetchableUrl = "https://example.com";
DownloadableItem item = new PhotoModel("title", fetchableUrl, "description", "jpeg",
"123", "album", inTempStore);
UUID jobId = UUID.randomUUID();
InputStreamWrapper streamWrapper = connectionProvider.getInputStreamForItem(
jobId, item);

Truth.assertThat(streamWrapper.getBytes()).isEqualTo(expectedBytes);
verify(jobStore).getStream(eq(jobId), eq(fetchableUrl));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.datatransferproject.transfer;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.datatransferproject.spi.cloud.connection.ConnectionProvider;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.types.common.DownloadableItem;

public class CallableSizeCalculator implements Callable<Map<String, Long>> {

private final UUID jobId;
private final ConnectionProvider connectionProvider;
private final Collection<? extends DownloadableItem> items;

public CallableSizeCalculator(
UUID jobId, ConnectionProvider connectionProvider, Collection<? extends DownloadableItem> items) {
this.jobId = Objects.requireNonNull(jobId);
this.connectionProvider = Objects.requireNonNull(connectionProvider);
this.items = Objects.requireNonNull(items);
}

@Override
public Map<String, Long> call() throws Exception {
Map<String, Long> result = new LinkedHashMap<>();
for (DownloadableItem item : items) {
InputStreamWrapper stream = connectionProvider.getInputStreamForItem(jobId, item);
long size = stream.getBytes();
if (size <= 0) {
size = computeSize(stream);
}

result.put(item.getIdempotentId(), size);
}

return result;
}

// Reads the input stream in full
private Long computeSize(InputStreamWrapper stream) throws IOException {
long size = 0;
try (InputStream inStream = stream.getStream()) {
byte[] buffer = new byte[1024 * 1024]; // 1MB
int chunkBytesRead;
while ((chunkBytesRead = inStream.read(buffer)) != -1) {
size += chunkBytesRead;
}
}

return size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.datatransferproject.api.launcher.DtpInternalMetricRecorder;
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.launcher.monitor.events.EventCode;
import org.datatransferproject.spi.cloud.connection.ConnectionProvider;
import org.datatransferproject.spi.cloud.storage.JobStore;
import org.datatransferproject.spi.cloud.types.PortabilityJob;
import org.datatransferproject.spi.cloud.types.PortabilityJob.TransferMode;
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor;
import org.datatransferproject.spi.transfer.provider.ExportResult;
import org.datatransferproject.spi.transfer.provider.Exporter;
Expand All @@ -35,9 +39,13 @@
import org.datatransferproject.spi.transfer.types.CopyExceptionWithFailureReason;
import org.datatransferproject.transfer.CallableExporter;
import org.datatransferproject.transfer.CallableImporter;
import org.datatransferproject.transfer.CallableSizeCalculator;
import org.datatransferproject.transfer.JobMetadata;
import org.datatransferproject.types.common.DownloadableItem;
import org.datatransferproject.types.common.ExportInformation;
import org.datatransferproject.types.common.models.DataModel;
import org.datatransferproject.types.common.models.photos.PhotosContainerResource;
import org.datatransferproject.types.common.models.videos.VideosContainerResource;
import org.datatransferproject.types.transfer.auth.AuthData;
import org.datatransferproject.types.transfer.errors.ErrorDetail;
import org.datatransferproject.types.transfer.retry.RetryException;
Expand Down Expand Up @@ -101,7 +109,21 @@ protected ExportResult<?> copyIteration(

DataModel exportedData = exportResult.getExportedData();
if (exportedData != null) {
importIteration(jobId, importAuthData, jobIdPrefix, copyIteration, exportedData);
PortabilityJob job = jobStore.findJob(jobId);
TransferMode transferMode =
job.transferMode() == null ? TransferMode.DATA_TRANSFER : job.transferMode();
switch (transferMode) {
case DATA_TRANSFER:
importIteration(jobId, importAuthData, jobIdPrefix, copyIteration, exportedData);
break;
case SIZE_CALCULATION:
sizeCalculationIteration(jobId, jobIdPrefix, exportedData);
break;
default:
throw new IllegalStateException(
"Job mode " + transferMode.name() + " is not supported by "
+ getClass().getSimpleName());
}
}

return exportResult;
Expand Down Expand Up @@ -205,6 +227,34 @@ private void importIteration(
}
}

private void sizeCalculationIteration(UUID jobId, String jobIdPrefix,
DataModel exportedData) throws CopyException {
Collection<? extends DownloadableItem> items;
if (exportedData instanceof PhotosContainerResource) {
items = ((PhotosContainerResource) exportedData).getPhotos();
} else if (exportedData instanceof VideosContainerResource) {
items = ((VideosContainerResource) exportedData).getVideos();
} else {
return;
}

CallableSizeCalculator callableSizeCalculator =
new CallableSizeCalculator(jobId, new ConnectionProvider(jobStore), items);
try {
RetryingCallable<Map<String, Long>> retryingImporter =
new RetryingCallable<>(
callableSizeCalculator,
retryStrategyLibraryProvider.get(),
Clock.systemUTC(),
monitor,
JobMetadata.getDataType(),
JobMetadata.getImportService());
jobStore.addBytes(jobId, retryingImporter.call());
} catch (RetryException | RuntimeException e) {
throw convertToCopyException(jobIdPrefix, "size estimation", e);
}
}

private CopyException convertToCopyException(String jobIdPrefix, String suffix, Exception e) {
if (e.getClass() == RetryException.class
&& CopyExceptionWithFailureReason.class.isAssignableFrom(e.getCause().getClass())) {
Expand Down
Loading

0 comments on commit 46aac50

Please sign in to comment.