Skip to content

Commit

Permalink
Skip copy if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
ssyssy committed Jul 31, 2023
1 parent 043860b commit 2ed4307
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 64 deletions.
1 change: 1 addition & 0 deletions common/transport/src/main/proto/grpc/block_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ message RouteFailure {
// A developer-facing error message
optional string message = 3;
optional bool retryable = 4;
optional bool is_skip = 5;
}

// next available id: 2
Expand Down
6 changes: 6 additions & 0 deletions dora/core/common/src/main/java/alluxio/metrics/MetricKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,12 @@ public static String getSyncMetricName(long mountId) {
.setDescription("The number of files failed to be copied by copy commands")
.setMetricType(MetricType.COUNTER)
.build();

public static final MetricKey MASTER_JOB_COPY_SKIP_FILE_COUNT =
new Builder("Master.JobCopySkipFileCount")
.setDescription("The number of files skipped to be copied by copy commands")
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey MASTER_JOB_COPY_SIZE =
new Builder("Master.JobCopyFileSize")
.setDescription("The total block size copied by copy commands")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class CopyJob extends AbstractJob<CopyJob.CopyTask> {
private final AtomicLong mTotalByteCount = new AtomicLong();
private final AtomicLong mTotalFailureCount = new AtomicLong();
private final AtomicLong mCurrentFailureCount = new AtomicLong();
private final AtomicLong mCurrentSkipCount = new AtomicLong();
private Optional<AlluxioRuntimeException> mFailedReason = Optional.empty();
private final Iterable<FileInfo> mFileIterable;
private Optional<Iterator<FileInfo>> mFileIterator = Optional.empty();
Expand Down Expand Up @@ -377,6 +378,16 @@ public void addFailure(String src, String message, int code) {
COPY_FAIL_FILE_COUNT.inc();
}

/**
* Add a skip to metrics.
*
*/
@VisibleForTesting
public void addSkip() {
mCurrentSkipCount.incrementAndGet();
COPY_SKIP_FILE_COUNT.inc();
}

private Route buildRoute(FileInfo sourceFile) {
String relativePath;
try {
Expand Down Expand Up @@ -465,9 +476,12 @@ public boolean processResponse(CopyTask task) {
for (RouteFailure status : response.getFailuresList()) {
totalBytes -= status.getRoute().getLength();
if (!isHealthy() || !status.getRetryable() || !addToRetry(
status.getRoute())) {
status.getRoute()) || !status.getIsSkip()) {
addFailure(status.getRoute().getSrc(), status.getMessage(), status.getCode());
}
if (status.getIsSkip()) {
addSkip();
}
}
}
addCopiedBytes(totalBytes);
Expand Down Expand Up @@ -576,11 +590,14 @@ private static class CopyProgressReport {
private final double mFailurePercentage;
private final AlluxioRuntimeException mFailureReason;
private final long mFailedFileCount;
private final long mSkippedFileCount;
private final Map<String, String> mFailedFilesWithReasons;
private final String mJobId;

public CopyProgressReport(CopyJob job, boolean verbose)
{
mVerbose = verbose;
mJobId = job.mJobId;
mJobState = job.mState;
mCheckContent = job.mCheckContent;
mProcessedFileCount = job.mProcessedFileCount.get();
Expand Down Expand Up @@ -609,6 +626,7 @@ public CopyProgressReport(CopyJob job, boolean verbose)
}
mFailureReason = job.mFailedReason.orElse(null);
mFailedFileCount = job.mFailedFiles.size();
mSkippedFileCount = job.mCurrentSkipCount.get();
if (verbose && mFailedFileCount > 0) {
mFailedFilesWithReasons = job.mFailedFiles;
} else {
Expand All @@ -633,6 +651,7 @@ private String getTextReport() {
StringBuilder progress = new StringBuilder();
progress.append(
format("\tSettings:\tcheck-content: %s%n", mCheckContent));
progress.append(format("\tJob Id: %s%n", mJobId));
progress.append(format("\tJob State: %s%s%n", mJobState,
mFailureReason == null
? "" : format(
Expand All @@ -644,17 +663,19 @@ private String getTextReport() {
progress.append(format("\t\t%s%n", stack.toString()));
}
}
progress.append(format("\tFiles Processed: %d%n", mProcessedFileCount));
progress.append(format("\tBytes Copied: %s%s%n",
FormatUtils.getSizeFromBytes(mByteCount),
progress.append(format("\tFiles qualified so far: %d%s%n", mProcessedFileCount,
mTotalByteCount == null
? "" : format(" out of %s", FormatUtils.getSizeFromBytes(mTotalByteCount))));
? "" : format(", %s", FormatUtils.getSizeFromBytes(mTotalByteCount))));
progress.append(format("\tFiles Failed: %s%n", mFailedFileCount));
progress.append(format("\tFiles Skipped: %s%n", mSkippedFileCount));
progress.append(format("\tFiles Succeeded: %s%n", mProcessedFileCount
- mFailedFileCount - mSkippedFileCount));
progress.append(format("\tBytes Copied: %s%n", FormatUtils.getSizeFromBytes(mByteCount)));
if (mThroughput != null) {
progress.append(format("\tThroughput: %s/s%n",
FormatUtils.getSizeFromBytes(mThroughput)));
}
progress.append(format("\tFiles failure rate: %.2f%%%n", mFailurePercentage));
progress.append(format("\tFiles Failed: %s%n", mFailedFileCount));
if (mVerbose && !mFailedFilesWithReasons.isEmpty()) {
mFailedFilesWithReasons.forEach((fileName, reason) ->
progress.append(format("\t\t%s: %s%n", fileName, reason)));
Expand Down Expand Up @@ -683,6 +704,8 @@ private String getJsonReport() {
MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_FILE_COUNT.getName());
public static final Counter COPY_FAIL_FILE_COUNT =
MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_FAIL_FILE_COUNT.getName());
public static final Counter COPY_SKIP_FILE_COUNT =
MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_SKIP_FILE_COUNT.getName());
public static final Counter COPY_SIZE =
MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_SIZE.getName());
public static final Meter COPY_RATE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,17 +647,17 @@ private String getTextReport() {
progress.append(format("\t\t%s%n", stack.toString()));
}
}
progress.append(format("\tFiles Processed: %d%n", mProcessedFileCount));
progress.append(format("\tBytes Moved: %s%s%n",
FormatUtils.getSizeFromBytes(mByteCount),
progress.append(format("\tFiles qualified so far: %d%s%n", mProcessedFileCount,
mTotalByteCount == null
? "" : format(" out of %s", FormatUtils.getSizeFromBytes(mTotalByteCount))));
? "" : format(", %s", FormatUtils.getSizeFromBytes(mTotalByteCount))));
progress.append(format("\tFiles Failed: %s%n", mFailedFileCount));
progress.append(format("\tFiles Succeeded: %s%n", mProcessedFileCount - mFailedFileCount));
progress.append(format("\tBytes Moved: %s%n", FormatUtils.getSizeFromBytes(mByteCount)));
if (mThroughput != null) {
progress.append(format("\tThroughput: %s/s%n",
FormatUtils.getSizeFromBytes(mThroughput)));
}
progress.append(format("\tFiles failure rate: %.2f%%%n", mFailurePercentage));
progress.append(format("\tFiles Failed: %s%n", mFailedFileCount));
if (mVerbose && !mFailedFilesWithReasons.isEmpty()) {
mFailedFilesWithReasons.forEach((fileName, reason) ->
progress.append(format("\t\t%s: %s%n", fileName, reason)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,29 +122,36 @@ public void testProgressReport() throws Exception {
List<Route> nextRoutes = job.getNextRoutes(25);
job.addCopiedBytes(640 * Constants.MB);
String expectedTextReport = "\tSettings:\tcheck-content: false\n"
+ "\tJob Id: 1\n"
+ "\tJob State: RUNNING\n"
+ "\tFiles Processed: 25\n"
+ "\tBytes Copied: 640.00MB out of 31.25GB\n"
+ "\tFiles failure rate: 0.00%\n"
+ "\tFiles Failed: 0\n";
+ "\tFiles qualified so far: 25, 31.25GB\n"
+ "\tFiles Failed: 0\n"
+ "\tFiles Skipped: 0\n"
+ "\tFiles Succeeded: 25\n"
+ "\tBytes Copied: 640.00MB\n"
+ "\tFiles failure rate: 0.00%\n";
assertEquals(expectedTextReport, job.getProgress(JobProgressReportFormat.TEXT, false));
assertEquals(expectedTextReport, job.getProgress(JobProgressReportFormat.TEXT, true));
String expectedJsonReport = "{\"mVerbose\":false,\"mJobState\":\"RUNNING\","
+ "\"mCheckContent\":false,\"mProcessedFileCount\":25,"
+ "\"mByteCount\":671088640,\"mTotalByteCount\":33554432000,"
+ "\"mFailurePercentage\":0.0,\"mFailedFileCount\":0,\"mFailedFilesWithReasons\":{}}";
+ "\"mFailurePercentage\":0.0,\"mFailedFileCount\":0,\"mSkippedFileCount\":0,"
+ "\"mFailedFilesWithReasons\":{},\"mJobId\":\"1\"}";
assertEquals(expectedJsonReport, job.getProgress(JobProgressReportFormat.JSON, false));
job.addFailure(nextRoutes.get(0).getSrc(), "Test error 1", 2);
job.addFailure(nextRoutes.get(4).getSrc(), "Test error 2", 2);
job.addFailure(nextRoutes.get(10).getSrc(), "Test error 3", 2);
job.failJob(new InternalRuntimeException("test"));
assertEquals(JobState.FAILED, job.getJobState());
String expectedTextReportWithError = "\tSettings:\tcheck-content: false\n"
+ "\tJob Id: 1\n"
+ "\tJob State: FAILED (alluxio.exception.runtime.InternalRuntimeException: test)\n"
+ "\tFiles Processed: 25\n"
+ "\tBytes Copied: 640.00MB out of 31.25GB\n"
+ "\tFiles failure rate: 12.00%\n"
+ "\tFiles Failed: 3\n";
+ "\tFiles qualified so far: 25, 31.25GB\n"
+ "\tFiles Failed: 3\n"
+ "\tFiles Skipped: 0\n"
+ "\tFiles Succeeded: 22\n"
+ "\tBytes Copied: 640.00MB\n"
+ "\tFiles failure rate: 12.00%\n";
assertEquals(expectedTextReportWithError,
job.getProgress(JobProgressReportFormat.TEXT, false));
String textReport = job.getProgress(JobProgressReportFormat.TEXT, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,11 @@ public void testProgressReport() throws Exception {
String expectedTextReport = "\tSettings:\tcheck-content: false\n"
+ "\tJob Id: 1\n"
+ "\tJob State: RUNNING\n"
+ "\tFiles Processed: 25\n"
+ "\tBytes Moved: 640.00MB out of 31.25GB\n"
+ "\tFiles failure rate: 0.00%\n"
+ "\tFiles Failed: 0\n";
+ "\tFiles qualified so far: 25, 31.25GB\n"
+ "\tFiles Failed: 0\n"
+ "\tFiles Succeeded: 25\n"
+ "\tBytes Moved: 640.00MB\n"
+ "\tFiles failure rate: 0.00%\n";
assertEquals(expectedTextReport, job.getProgress(JobProgressReportFormat.TEXT, false));
assertEquals(expectedTextReport, job.getProgress(JobProgressReportFormat.TEXT, true));
String expectedJsonReport = "{\"mVerbose\":false,\"mJobState\":\"RUNNING\","
Expand All @@ -153,10 +154,11 @@ public void testProgressReport() throws Exception {
String expectedTextReportWithError = "\tSettings:\tcheck-content: false\n"
+ "\tJob Id: 1\n"
+ "\tJob State: FAILED (alluxio.exception.runtime.InternalRuntimeException: test)\n"
+ "\tFiles Processed: 25\n"
+ "\tBytes Moved: 640.00MB out of 31.25GB\n"
+ "\tFiles failure rate: 12.00%\n"
+ "\tFiles Failed: 3\n";
+ "\tFiles qualified so far: 25, 31.25GB\n"
+ "\tFiles Failed: 3\n"
+ "\tFiles Succeeded: 22\n"
+ "\tBytes Moved: 640.00MB\n"
+ "\tFiles failure rate: 12.00%\n";
assertEquals(expectedTextReportWithError,
job.getProgress(JobProgressReportFormat.TEXT, false));
String textReport = job.getProgress(JobProgressReportFormat.TEXT, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import alluxio.exception.AccessControlException;
import alluxio.exception.FileAlreadyExistsException;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.runtime.FailedPreconditionRuntimeException;
import alluxio.exception.status.NotFoundException;
import alluxio.grpc.Command;
import alluxio.grpc.CommandType;
Expand Down Expand Up @@ -86,6 +87,7 @@
import alluxio.worker.grpc.GrpcExecutors;
import alluxio.worker.task.CopyHandler;
import alluxio.worker.task.DeleteHandler;
import alluxio.worker.task.ValidateHandler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -545,14 +547,22 @@ public ListenableFuture<List<RouteFailure>> copy(List<Route> routes, UfsReadOpti
AuthenticatedClientUser.set(readOptions.getUser());
}
checkCopyPermission(route.getSrc(), route.getDst());
if (ValidateHandler.validate(route, writeOptions, srcFs, dstFs) == -1) {
// Skip copy if there is a failure during validation.
RouteFailure.Builder builder =
RouteFailure.newBuilder().setRoute(route).setIsSkip(true).setCode(0);
errors.add(builder.build());
return;
}
CopyHandler.copy(route, writeOptions, srcFs, dstFs);
} catch (Throwable t) {
boolean permissionCheckSucceeded = !(t instanceof AccessControlException);
LOG.error("Failed to copy {} to {}", route.getSrc(), route.getDst(), t);
AlluxioRuntimeException e = AlluxioRuntimeException.from(t);
RouteFailure.Builder builder =
RouteFailure.newBuilder().setRoute(route).setCode(e.getStatus().getCode().value())
.setRetryable(e.isRetryable() && permissionCheckSucceeded);
.setRetryable(e.isRetryable() && permissionCheckSucceeded)
.setIsSkip(false);
if (e.getMessage() != null) {
builder.setMessage(e.getMessage());
}
Expand Down Expand Up @@ -600,6 +610,10 @@ public ListenableFuture<List<RouteFailure>> move(List<Route> routes, UfsReadOpti
AuthenticatedClientUser.set(readOptions.getUser());
}
checkMovePermission(route.getSrc(), route.getDst());
if (ValidateHandler.validate(route, writeOptions, srcFs, dstFs) == -1) {
throw new FailedPreconditionRuntimeException("File " + route.getDst()
+ " is already in UFS");
}
CopyHandler.copy(route, writeOptions, srcFs, dstFs);
try {
DeleteHandler.delete(new AlluxioURI(route.getSrc()), srcFs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@
import alluxio.Constants;
import alluxio.client.file.FileSystem;
import alluxio.client.file.URIStatus;
import alluxio.exception.AlluxioException;
import alluxio.exception.FileDoesNotExistException;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.runtime.FailedPreconditionRuntimeException;
import alluxio.exception.runtime.InternalRuntimeException;
import alluxio.exception.runtime.InvalidArgumentRuntimeException;
import alluxio.exception.runtime.NotFoundRuntimeException;
import alluxio.grpc.Bits;
import alluxio.grpc.CreateDirectoryPOptions;
import alluxio.grpc.CreateFilePOptions;
Expand All @@ -38,8 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
Expand All @@ -62,37 +55,14 @@ public final class CopyHandler {
*/
public static void copy(Route route, WriteOptions writeOptions,
FileSystem srcFs, FileSystem dstFs) {

AlluxioURI src = new AlluxioURI(route.getSrc());
AlluxioURI dst = new AlluxioURI(route.getDst());
URIStatus dstStatus = null;
URIStatus sourceStatus;
try {
dstStatus = dstFs.getStatus(dst, GET_STATUS_OPTIONS);
} catch (FileNotFoundException | NotFoundRuntimeException ignore) {
// ignored
} catch (FileDoesNotExistException ignore) {
// should not happen
} catch (AlluxioException | IOException e) {
throw new InternalRuntimeException(e);
}
try {
sourceStatus = srcFs.getStatus(src, GET_STATUS_OPTIONS);
} catch (Exception e) {
throw AlluxioRuntimeException.from(e);
}
if (dstStatus != null && dstStatus.isFolder() && sourceStatus.isFolder()) {
// skip copy if it's already a folder there
return;
}
if (dstStatus != null && !dstStatus.isFolder() && !writeOptions.getOverwrite()) {
throw new FailedPreconditionRuntimeException("File " + route.getDst() + " is already in UFS");
}
if (dstStatus != null && (dstStatus.isFolder() != sourceStatus.isFolder())) {
throw new InvalidArgumentRuntimeException(
"Can't replace target because type is not compatible. Target is " + dstStatus
+ ", Source is " + sourceStatus);
}

if (sourceStatus.isFolder()) {
try {
Expand All @@ -103,7 +73,6 @@ public static void copy(Route route, WriteOptions writeOptions,
throw AlluxioRuntimeException.from(e);
}
}

long copiedLength = copyFile(src, dst, srcFs, dstFs, writeOptions.getWriteType());
if (writeOptions.getCheckContent()) {
if (!checkLengthAndContentHash(sourceStatus, dst, dstFs, copiedLength)) {
Expand Down
Loading

0 comments on commit 2ed4307

Please sign in to comment.