diff --git a/common/transport/src/main/proto/grpc/block_worker.proto b/common/transport/src/main/proto/grpc/block_worker.proto index c5295302a30a..d213d842420b 100644 --- a/common/transport/src/main/proto/grpc/block_worker.proto +++ b/common/transport/src/main/proto/grpc/block_worker.proto @@ -324,6 +324,7 @@ message RouteFailure { // A developer-facing error message optional string message = 3; optional bool retryable = 4; + optional bool is_skip = 5; } // next available id: 2 diff --git a/dora/core/common/src/main/java/alluxio/metrics/MetricKey.java b/dora/core/common/src/main/java/alluxio/metrics/MetricKey.java index 4be2e9fc4701..e27b0145a92e 100644 --- a/dora/core/common/src/main/java/alluxio/metrics/MetricKey.java +++ b/dora/core/common/src/main/java/alluxio/metrics/MetricKey.java @@ -1199,6 +1199,12 @@ public static String getSyncMetricName(long mountId) { .setDescription("The number of files failed to be copied by copy commands") .setMetricType(MetricType.COUNTER) .build(); + + public static final MetricKey MASTER_JOB_COPY_SKIP_FILE_COUNT = + new Builder("Master.JobCopySkipFileCount") + .setDescription("The number of files skipped to be copied by copy commands") + .setMetricType(MetricType.COUNTER) + .build(); public static final MetricKey MASTER_JOB_COPY_SIZE = new Builder("Master.JobCopyFileSize") .setDescription("The total block size copied by copy commands") diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/AbstractJob.java b/dora/core/server/master/src/main/java/alluxio/master/job/AbstractJob.java index 3ee2d383ce34..502d4b8198c6 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/AbstractJob.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/AbstractJob.java @@ -18,6 +18,7 @@ import alluxio.scheduler.job.JobState; import alluxio.scheduler.job.Task; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +38,7 @@ public abstract class AbstractJob> implements Job { protected final AtomicInteger mTaskIdGenerator = new AtomicInteger(0); protected JobState mState; // TODO(lucy) make it thread safe state update protected OptionalLong mEndTime = OptionalLong.empty(); - protected final long mStartTime; + protected long mStartTime; protected final Optional mUser; // not making it thread safe as currently scheduler has been single-threaded protected final LinkedHashSet mRetryTaskList = new LinkedHashSet<>(); @@ -107,6 +108,17 @@ public void setEndTime(long time) { mEndTime = OptionalLong.of(time); } + /** + * Update start time. + * This is for internal tests. + * + * @param time time in ms + */ + @VisibleForTesting + public void setStartTime(long time) { + mStartTime = time; + } + /** * Get load status. * diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/CopyJob.java b/dora/core/server/master/src/main/java/alluxio/master/job/CopyJob.java index 72454015688c..2ba5a3350f75 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/CopyJob.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/CopyJob.java @@ -62,6 +62,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -108,6 +109,7 @@ public class CopyJob extends AbstractJob { private final AtomicLong mTotalByteCount = new AtomicLong(); private final AtomicLong mTotalFailureCount = new AtomicLong(); private final AtomicLong mCurrentFailureCount = new AtomicLong(); + private final AtomicLong mCurrentSkipCount = new AtomicLong(); private Optional mFailedReason = Optional.empty(); private final Iterable mFileIterable; private Optional> mFileIterator = Optional.empty(); @@ -377,6 +379,16 @@ public void addFailure(String src, String message, int code) { COPY_FAIL_FILE_COUNT.inc(); } + /** + * Add a skip to metrics. + * + */ + @VisibleForTesting + public void addSkip() { + mCurrentSkipCount.incrementAndGet(); + COPY_SKIP_FILE_COUNT.inc(); + } + private Route buildRoute(FileInfo sourceFile) { String relativePath; try { @@ -465,9 +477,12 @@ public boolean processResponse(CopyTask task) { for (RouteFailure status : response.getFailuresList()) { totalBytes -= status.getRoute().getLength(); if (!isHealthy() || !status.getRetryable() || !addToRetry( - status.getRoute())) { + status.getRoute()) || !status.getIsSkip()) { addFailure(status.getRoute().getSrc(), status.getMessage(), status.getCode()); } + if (status.getIsSkip()) { + addSkip(); + } } } addCopiedBytes(totalBytes); @@ -576,11 +591,16 @@ private static class CopyProgressReport { private final double mFailurePercentage; private final AlluxioRuntimeException mFailureReason; private final long mFailedFileCount; + private final long mSkippedFileCount; private final Map mFailedFilesWithReasons; + private final String mJobId; + private final long mStartTime; + private final long mEndTime; public CopyProgressReport(CopyJob job, boolean verbose) { mVerbose = verbose; + mJobId = job.mJobId; mJobState = job.mState; mCheckContent = job.mCheckContent; mProcessedFileCount = job.mProcessedFileCount.get(); @@ -609,11 +629,23 @@ public CopyProgressReport(CopyJob job, boolean verbose) } mFailureReason = job.mFailedReason.orElse(null); mFailedFileCount = job.mFailedFiles.size(); + mSkippedFileCount = job.mCurrentSkipCount.get(); if (verbose && mFailedFileCount > 0) { mFailedFilesWithReasons = job.mFailedFiles; } else { mFailedFilesWithReasons = Collections.emptyMap(); } + mStartTime = job.mStartTime; + if (mJobState == JobState.SUCCEEDED || mJobState == JobState.FAILED) { + if (job.mEndTime.isPresent()) { + mEndTime = job.mEndTime.getAsLong(); + } else { + throw new InternalRuntimeException( + String.format("No end time in ending state %s", mJobState)); + } + } else { + mEndTime = 0; + } } public String getReport(JobProgressReportFormat format) @@ -632,29 +664,44 @@ public String getReport(JobProgressReportFormat format) private String getTextReport() { StringBuilder progress = new StringBuilder(); progress.append( - format("\tSettings:\tcheck-content: %s%n", mCheckContent)); - progress.append(format("\tJob State: %s%s%n", mJobState, - mFailureReason == null - ? "" : format( - " (%s: %s)", - mFailureReason.getClass().getName(), - mFailureReason.getMessage()))); + format("\tSettings: \"check-content: %s\"%n", mCheckContent)); + progress.append(format("\tJob Submitted: %s%n", new Date(mStartTime))); + progress.append(format("\tJob Id: %s%n", mJobId)); + if (mJobState == JobState.SUCCEEDED || mJobState == JobState.FAILED) { + progress.append(format("\tJob State: %s%s, finished at %s%n", mJobState, + mFailureReason == null + ? "" : format( + " (%s: %s)", + mFailureReason.getClass().getName(), + mFailureReason.getMessage()), + new Date(mEndTime))); + } else { + progress.append(format("\tJob State: %s%s%n", mJobState, + mFailureReason == null + ? "" : format( + " (%s: %s)", + mFailureReason.getClass().getName(), + mFailureReason.getMessage()))); + } if (mVerbose && mFailureReason != null) { for (StackTraceElement stack : mFailureReason.getStackTrace()) { progress.append(format("\t\t%s%n", stack.toString())); } } - progress.append(format("\tFiles Processed: %d%n", mProcessedFileCount)); - progress.append(format("\tBytes Copied: %s%s%n", - FormatUtils.getSizeFromBytes(mByteCount), + progress.append(format("\tFiles qualified%s: %d%s%n", + mJobState == JobState.RUNNING ? " so far" : "", mProcessedFileCount, mTotalByteCount == null - ? "" : format(" out of %s", FormatUtils.getSizeFromBytes(mTotalByteCount)))); + ? "" : format(", %s", FormatUtils.getSizeFromBytes(mTotalByteCount)))); + progress.append(format("\tFiles Failed: %s%n", mFailedFileCount)); + progress.append(format("\tFiles Skipped: %s%n", mSkippedFileCount)); + progress.append(format("\tFiles Succeeded: %s%n", mProcessedFileCount + - mFailedFileCount - mSkippedFileCount)); + progress.append(format("\tBytes Copied: %s%n", FormatUtils.getSizeFromBytes(mByteCount))); if (mThroughput != null) { progress.append(format("\tThroughput: %s/s%n", FormatUtils.getSizeFromBytes(mThroughput))); } progress.append(format("\tFiles failure rate: %.2f%%%n", mFailurePercentage)); - progress.append(format("\tFiles Failed: %s%n", mFailedFileCount)); if (mVerbose && !mFailedFilesWithReasons.isEmpty()) { mFailedFilesWithReasons.forEach((fileName, reason) -> progress.append(format("\t\t%s: %s%n", fileName, reason))); @@ -683,6 +730,8 @@ private String getJsonReport() { MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_FILE_COUNT.getName()); public static final Counter COPY_FAIL_FILE_COUNT = MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_FAIL_FILE_COUNT.getName()); + public static final Counter COPY_SKIP_FILE_COUNT = + MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_SKIP_FILE_COUNT.getName()); public static final Counter COPY_SIZE = MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_SIZE.getName()); public static final Meter COPY_RATE = diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/MoveJob.java b/dora/core/server/master/src/main/java/alluxio/master/job/MoveJob.java index bb71728db1d7..ed73a325e992 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/MoveJob.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/MoveJob.java @@ -62,6 +62,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -578,6 +579,8 @@ private static class MoveProgressReport { private final long mFailedFileCount; private final Map mFailedFilesWithReasons; private final String mJobId; + private final long mStartTime; + private final long mEndTime; public MoveProgressReport(MoveJob job, boolean verbose) { @@ -616,6 +619,17 @@ public MoveProgressReport(MoveJob job, boolean verbose) } else { mFailedFilesWithReasons = Collections.emptyMap(); } + mStartTime = job.mStartTime; + if (mJobState == JobState.SUCCEEDED || mJobState == JobState.FAILED) { + if (job.mEndTime.isPresent()) { + mEndTime = job.mEndTime.getAsLong(); + } else { + throw new InternalRuntimeException( + String.format("No end time in ending state %s", mJobState)); + } + } else { + mEndTime = 0; + } } public String getReport(JobProgressReportFormat format) @@ -634,30 +648,42 @@ public String getReport(JobProgressReportFormat format) private String getTextReport() { StringBuilder progress = new StringBuilder(); progress.append( - format("\tSettings:\tcheck-content: %s%n", mCheckContent)); + format("\tSettings: \"check-content: %s\"%n", mCheckContent)); + progress.append(format("\tJob Submitted: %s%n", new Date(mStartTime))); progress.append(format("\tJob Id: %s%n", mJobId)); - progress.append(format("\tJob State: %s%s%n", mJobState, - mFailureReason == null - ? "" : format( - " (%s: %s)", - mFailureReason.getClass().getName(), - mFailureReason.getMessage()))); + if (mJobState == JobState.SUCCEEDED || mJobState == JobState.FAILED) { + progress.append(format("\tJob State: %s%s, finished at %s%n", mJobState, + mFailureReason == null + ? "" : format( + " (%s: %s)", + mFailureReason.getClass().getName(), + mFailureReason.getMessage()), + new Date(mEndTime))); + } else { + progress.append(format("\tJob State: %s%s%n", mJobState, + mFailureReason == null + ? "" : format( + " (%s: %s)", + mFailureReason.getClass().getName(), + mFailureReason.getMessage()))); + } if (mVerbose && mFailureReason != null) { for (StackTraceElement stack : mFailureReason.getStackTrace()) { progress.append(format("\t\t%s%n", stack.toString())); } } - progress.append(format("\tFiles Processed: %d%n", mProcessedFileCount)); - progress.append(format("\tBytes Moved: %s%s%n", - FormatUtils.getSizeFromBytes(mByteCount), + progress.append(format("\tFiles qualified%s: %d%s%n", + mJobState == JobState.RUNNING ? " so far" : "", mProcessedFileCount, mTotalByteCount == null - ? "" : format(" out of %s", FormatUtils.getSizeFromBytes(mTotalByteCount)))); + ? "" : format(", %s", FormatUtils.getSizeFromBytes(mTotalByteCount)))); + progress.append(format("\tFiles Failed: %s%n", mFailedFileCount)); + progress.append(format("\tFiles Succeeded: %s%n", mProcessedFileCount - mFailedFileCount)); + progress.append(format("\tBytes Moved: %s%n", FormatUtils.getSizeFromBytes(mByteCount))); if (mThroughput != null) { progress.append(format("\tThroughput: %s/s%n", FormatUtils.getSizeFromBytes(mThroughput))); } progress.append(format("\tFiles failure rate: %.2f%%%n", mFailurePercentage)); - progress.append(format("\tFiles Failed: %s%n", mFailedFileCount)); if (mVerbose && !mFailedFilesWithReasons.isEmpty()) { mFailedFilesWithReasons.forEach((fileName, reason) -> progress.append(format("\t\t%s: %s%n", fileName, reason))); diff --git a/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/CopyJobTest.java b/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/CopyJobTest.java index a622fb6d4a18..070bf60309d7 100644 --- a/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/CopyJobTest.java +++ b/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/CopyJobTest.java @@ -119,32 +119,45 @@ public void testProgressReport() throws Exception { OptionalLong.empty(), false, false, false, files, Optional.empty())); when(job.getDurationInSec()).thenReturn(0L); job.setJobState(JobState.RUNNING, false); + job.setStartTime(1690000000000L); List nextRoutes = job.getNextRoutes(25); job.addCopiedBytes(640 * Constants.MB); - String expectedTextReport = "\tSettings:\tcheck-content: false\n" + String expectedTextReport = "\tSettings: \"check-content: false\"\n" + + "\tJob Submitted: Sat Jul 22 04:26:40 UTC 2023\n" + + "\tJob Id: 1\n" + "\tJob State: RUNNING\n" - + "\tFiles Processed: 25\n" - + "\tBytes Copied: 640.00MB out of 31.25GB\n" - + "\tFiles failure rate: 0.00%\n" - + "\tFiles Failed: 0\n"; + + "\tFiles qualified so far: 25, 31.25GB\n" + + "\tFiles Failed: 0\n" + + "\tFiles Skipped: 0\n" + + "\tFiles Succeeded: 25\n" + + "\tBytes Copied: 640.00MB\n" + + "\tFiles failure rate: 0.00%\n"; assertEquals(expectedTextReport, job.getProgress(JobProgressReportFormat.TEXT, false)); assertEquals(expectedTextReport, job.getProgress(JobProgressReportFormat.TEXT, true)); String expectedJsonReport = "{\"mVerbose\":false,\"mJobState\":\"RUNNING\"," + "\"mCheckContent\":false,\"mProcessedFileCount\":25," + "\"mByteCount\":671088640,\"mTotalByteCount\":33554432000," - + "\"mFailurePercentage\":0.0,\"mFailedFileCount\":0,\"mFailedFilesWithReasons\":{}}"; + + "\"mFailurePercentage\":0.0,\"mFailedFileCount\":0,\"mSkippedFileCount\":0," + + "\"mFailedFilesWithReasons\":{},\"mJobId\":\"1\",\"mStartTime\":1690000000000," + + "\"mEndTime\":0}"; assertEquals(expectedJsonReport, job.getProgress(JobProgressReportFormat.JSON, false)); job.addFailure(nextRoutes.get(0).getSrc(), "Test error 1", 2); job.addFailure(nextRoutes.get(4).getSrc(), "Test error 2", 2); job.addFailure(nextRoutes.get(10).getSrc(), "Test error 3", 2); job.failJob(new InternalRuntimeException("test")); + job.setEndTime(1700000000000L); assertEquals(JobState.FAILED, job.getJobState()); - String expectedTextReportWithError = "\tSettings:\tcheck-content: false\n" - + "\tJob State: FAILED (alluxio.exception.runtime.InternalRuntimeException: test)\n" - + "\tFiles Processed: 25\n" - + "\tBytes Copied: 640.00MB out of 31.25GB\n" - + "\tFiles failure rate: 12.00%\n" - + "\tFiles Failed: 3\n"; + String expectedTextReportWithError = "\tSettings: \"check-content: false\"\n" + + "\tJob Submitted: Sat Jul 22 04:26:40 UTC 2023\n" + + "\tJob Id: 1\n" + + "\tJob State: FAILED (alluxio.exception.runtime.InternalRuntimeException: test), " + + "finished at Tue Nov 14 22:13:20 UTC 2023\n" + + "\tFiles qualified: 25, 31.25GB\n" + + "\tFiles Failed: 3\n" + + "\tFiles Skipped: 0\n" + + "\tFiles Succeeded: 22\n" + + "\tBytes Copied: 640.00MB\n" + + "\tFiles failure rate: 12.00%\n"; assertEquals(expectedTextReportWithError, job.getProgress(JobProgressReportFormat.TEXT, false)); String textReport = job.getProgress(JobProgressReportFormat.TEXT, true); diff --git a/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/MoveJobTest.java b/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/MoveJobTest.java index d9c4eda396b9..2c2694778825 100644 --- a/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/MoveJobTest.java +++ b/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/MoveJobTest.java @@ -128,35 +128,43 @@ public void testProgressReport() throws Exception { Optional.empty())); when(job.getDurationInSec()).thenReturn(0L); job.setJobState(JobState.RUNNING, false); + job.setStartTime(1690000000000L); List nextRoutes = job.getNextRoutes(25); job.addMovedBytes(640 * Constants.MB); - String expectedTextReport = "\tSettings:\tcheck-content: false\n" + String expectedTextReport = "\tSettings: \"check-content: false\"\n" + + "\tJob Submitted: Sat Jul 22 04:26:40 UTC 2023\n" + "\tJob Id: 1\n" + "\tJob State: RUNNING\n" - + "\tFiles Processed: 25\n" - + "\tBytes Moved: 640.00MB out of 31.25GB\n" - + "\tFiles failure rate: 0.00%\n" - + "\tFiles Failed: 0\n"; + + "\tFiles qualified so far: 25, 31.25GB\n" + + "\tFiles Failed: 0\n" + + "\tFiles Succeeded: 25\n" + + "\tBytes Moved: 640.00MB\n" + + "\tFiles failure rate: 0.00%\n"; assertEquals(expectedTextReport, job.getProgress(JobProgressReportFormat.TEXT, false)); assertEquals(expectedTextReport, job.getProgress(JobProgressReportFormat.TEXT, true)); String expectedJsonReport = "{\"mVerbose\":false,\"mJobState\":\"RUNNING\"," + "\"mCheckContent\":false,\"mProcessedFileCount\":25," + "\"mByteCount\":671088640,\"mTotalByteCount\":33554432000," + "\"mFailurePercentage\":0.0,\"mFailedFileCount\":0," - + "\"mFailedFilesWithReasons\":{},\"mJobId\":\"1\"}"; + + "\"mFailedFilesWithReasons\":{},\"mJobId\":\"1\",\"mStartTime\":1690000000000," + + "\"mEndTime\":0}"; assertEquals(expectedJsonReport, job.getProgress(JobProgressReportFormat.JSON, false)); job.addFailure(nextRoutes.get(0).getSrc(), "Test error 1", 2); job.addFailure(nextRoutes.get(4).getSrc(), "Test error 2", 2); job.addFailure(nextRoutes.get(10).getSrc(), "Test error 3", 2); job.failJob(new InternalRuntimeException("test")); + job.setEndTime(1700000000000L); assertEquals(JobState.FAILED, job.getJobState()); - String expectedTextReportWithError = "\tSettings:\tcheck-content: false\n" + String expectedTextReportWithError = "\tSettings: \"check-content: false\"\n" + + "\tJob Submitted: Sat Jul 22 04:26:40 UTC 2023\n" + "\tJob Id: 1\n" - + "\tJob State: FAILED (alluxio.exception.runtime.InternalRuntimeException: test)\n" - + "\tFiles Processed: 25\n" - + "\tBytes Moved: 640.00MB out of 31.25GB\n" - + "\tFiles failure rate: 12.00%\n" - + "\tFiles Failed: 3\n"; + + "\tJob State: FAILED (alluxio.exception.runtime.InternalRuntimeException: test), " + + "finished at Tue Nov 14 22:13:20 UTC 2023\n" + + "\tFiles qualified: 25, 31.25GB\n" + + "\tFiles Failed: 3\n" + + "\tFiles Succeeded: 22\n" + + "\tBytes Moved: 640.00MB\n" + + "\tFiles failure rate: 12.00%\n"; assertEquals(expectedTextReportWithError, job.getProgress(JobProgressReportFormat.TEXT, false)); String textReport = job.getProgress(JobProgressReportFormat.TEXT, true); diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java index 28da23bbc070..2b8410f03c3e 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java @@ -30,6 +30,7 @@ import alluxio.exception.AccessControlException; import alluxio.exception.FileAlreadyExistsException; import alluxio.exception.runtime.AlluxioRuntimeException; +import alluxio.exception.runtime.FailedPreconditionRuntimeException; import alluxio.exception.runtime.UnavailableRuntimeException; import alluxio.exception.status.NotFoundException; import alluxio.grpc.Command; @@ -91,6 +92,7 @@ import alluxio.worker.grpc.GrpcExecutors; import alluxio.worker.task.CopyHandler; import alluxio.worker.task.DeleteHandler; +import alluxio.worker.task.ValidateHandler; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -593,6 +595,13 @@ public ListenableFuture> copy(List routes, UfsReadOpti AuthenticatedClientUser.set(readOptions.getUser()); } checkCopyPermission(route.getSrc(), route.getDst()); + if (!ValidateHandler.validate(route, writeOptions, srcFs, dstFs)) { + // Skip copy if there is a failure during validation. + RouteFailure.Builder builder = + RouteFailure.newBuilder().setRoute(route).setIsSkip(true).setCode(0); + errors.add(builder.build()); + return; + } CopyHandler.copy(route, writeOptions, srcFs, dstFs); } catch (Throwable t) { boolean permissionCheckSucceeded = !(t instanceof AccessControlException); @@ -600,7 +609,8 @@ public ListenableFuture> copy(List routes, UfsReadOpti AlluxioRuntimeException e = AlluxioRuntimeException.from(t); RouteFailure.Builder builder = RouteFailure.newBuilder().setRoute(route).setCode(e.getStatus().getCode().value()) - .setRetryable(e.isRetryable() && permissionCheckSucceeded); + .setRetryable(e.isRetryable() && permissionCheckSucceeded) + .setIsSkip(false); if (e.getMessage() != null) { builder.setMessage(e.getMessage()); } @@ -648,6 +658,10 @@ public ListenableFuture> move(List routes, UfsReadOpti AuthenticatedClientUser.set(readOptions.getUser()); } checkMovePermission(route.getSrc(), route.getDst()); + if (!ValidateHandler.validate(route, writeOptions, srcFs, dstFs)) { + throw new FailedPreconditionRuntimeException("File " + route.getDst() + + " is already in UFS"); + } CopyHandler.copy(route, writeOptions, srcFs, dstFs); try { DeleteHandler.delete(new AlluxioURI(route.getSrc()), srcFs); diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/task/CopyHandler.java b/dora/core/server/worker/src/main/java/alluxio/worker/task/CopyHandler.java index 30dee8462c3e..a5e890850839 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/task/CopyHandler.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/task/CopyHandler.java @@ -15,13 +15,8 @@ import alluxio.Constants; import alluxio.client.file.FileSystem; import alluxio.client.file.URIStatus; -import alluxio.exception.AlluxioException; -import alluxio.exception.FileDoesNotExistException; import alluxio.exception.runtime.AlluxioRuntimeException; -import alluxio.exception.runtime.FailedPreconditionRuntimeException; import alluxio.exception.runtime.InternalRuntimeException; -import alluxio.exception.runtime.InvalidArgumentRuntimeException; -import alluxio.exception.runtime.NotFoundRuntimeException; import alluxio.grpc.Bits; import alluxio.grpc.CreateDirectoryPOptions; import alluxio.grpc.CreateFilePOptions; @@ -38,8 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Objects; @@ -62,37 +55,14 @@ public final class CopyHandler { */ public static void copy(Route route, WriteOptions writeOptions, FileSystem srcFs, FileSystem dstFs) { - AlluxioURI src = new AlluxioURI(route.getSrc()); AlluxioURI dst = new AlluxioURI(route.getDst()); - URIStatus dstStatus = null; URIStatus sourceStatus; - try { - dstStatus = dstFs.getStatus(dst, GET_STATUS_OPTIONS); - } catch (FileNotFoundException | NotFoundRuntimeException ignore) { - // ignored - } catch (FileDoesNotExistException ignore) { - // should not happen - } catch (AlluxioException | IOException e) { - throw new InternalRuntimeException(e); - } try { sourceStatus = srcFs.getStatus(src, GET_STATUS_OPTIONS); } catch (Exception e) { throw AlluxioRuntimeException.from(e); } - if (dstStatus != null && dstStatus.isFolder() && sourceStatus.isFolder()) { - // skip copy if it's already a folder there - return; - } - if (dstStatus != null && !dstStatus.isFolder() && !writeOptions.getOverwrite()) { - throw new FailedPreconditionRuntimeException("File " + route.getDst() + " is already in UFS"); - } - if (dstStatus != null && (dstStatus.isFolder() != sourceStatus.isFolder())) { - throw new InvalidArgumentRuntimeException( - "Can't replace target because type is not compatible. Target is " + dstStatus - + ", Source is " + sourceStatus); - } if (sourceStatus.isFolder()) { try { @@ -103,7 +73,6 @@ public static void copy(Route route, WriteOptions writeOptions, throw AlluxioRuntimeException.from(e); } } - long copiedLength = copyFile(src, dst, srcFs, dstFs, writeOptions.getWriteType()); if (writeOptions.getCheckContent()) { if (!checkLengthAndContentHash(sourceStatus, dst, dstFs, copiedLength)) { diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/task/ValidateHandler.java b/dora/core/server/worker/src/main/java/alluxio/worker/task/ValidateHandler.java new file mode 100644 index 000000000000..3ab5d2a2f940 --- /dev/null +++ b/dora/core/server/worker/src/main/java/alluxio/worker/task/ValidateHandler.java @@ -0,0 +1,89 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.worker.task; + +import alluxio.AlluxioURI; +import alluxio.client.file.FileSystem; +import alluxio.client.file.URIStatus; +import alluxio.exception.AlluxioException; +import alluxio.exception.FileDoesNotExistException; +import alluxio.exception.runtime.AlluxioRuntimeException; +import alluxio.exception.runtime.InternalRuntimeException; +import alluxio.exception.runtime.InvalidArgumentRuntimeException; +import alluxio.exception.runtime.NotFoundRuntimeException; +import alluxio.grpc.GetStatusPOptions; +import alluxio.grpc.Route; +import alluxio.grpc.WriteOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; + +/** + * ValidateHandler is responsible for validating files at source and destination. + */ +public class ValidateHandler { + private static final Logger LOG = LoggerFactory.getLogger(ValidateHandler.class); + private static final GetStatusPOptions GET_STATUS_OPTIONS = + GetStatusPOptions.getDefaultInstance().toBuilder().setIncludeRealContentHash(true).build(); + + /** + * Validates a file from source to destination. + * + * @param route the route + * @param writeOptions the write options + * @param srcFs the source file system + * @param dstFs the destination file system + * @return true means the validation passes + * false means the file is already in the target UFS + */ + public static boolean validate(Route route, WriteOptions writeOptions, + FileSystem srcFs, FileSystem dstFs) { + + AlluxioURI src = new AlluxioURI(route.getSrc()); + AlluxioURI dst = new AlluxioURI(route.getDst()); + URIStatus dstStatus = null; + URIStatus sourceStatus; + try { + dstStatus = dstFs.getStatus(dst, GET_STATUS_OPTIONS); + } catch (FileNotFoundException | NotFoundRuntimeException ignore) { + // ignored + } catch (FileDoesNotExistException ignore) { + // should not happen + } catch (AlluxioException | IOException e) { + throw new InternalRuntimeException(e); + } + try { + sourceStatus = srcFs.getStatus(src, GET_STATUS_OPTIONS); + } catch (Exception e) { + throw AlluxioRuntimeException.from(e); + } + if (dstStatus != null && dstStatus.isFolder() && sourceStatus.isFolder()) { + // skip if it's already a folder there + return true; + } + if (dstStatus != null && !dstStatus.isFolder() && !writeOptions.getOverwrite()) { + LOG.debug("File " + route.getDst() + " is already in target UFS"); + return false; + } + if (dstStatus != null && (dstStatus.isFolder() != sourceStatus.isFolder())) { + LOG.debug("Can't replace target because type is not compatible. Target is " + dstStatus + + ", Source is " + sourceStatus); + throw new InvalidArgumentRuntimeException( + "Can't replace target because type is not compatible. Target is " + dstStatus + + ", Source is " + sourceStatus); + } + return true; + } +} diff --git a/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java b/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java index 34c0ef24eeac..3f6478225b59 100644 --- a/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java +++ b/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java @@ -40,6 +40,7 @@ import alluxio.grpc.SetAttributePOptions; import alluxio.grpc.UfsReadOptions; import alluxio.grpc.WriteOptions; +import alluxio.grpc.WritePType; import alluxio.membership.MembershipManager; import alluxio.security.authorization.Mode; import alluxio.underfs.UfsStatus; @@ -208,7 +209,6 @@ public void testSingleFolderCopy() throws IOException, ExecutionException, Inter } @Test - @Ignore public void testFolderWithFileCopy() throws IOException, ExecutionException, InterruptedException { File srcRoot = mTestFolder.newFolder("src"); @@ -257,6 +257,58 @@ public void testFolderWithFileCopy() } } + @Test + public void testFolderWithFileCopyWithSkip() + throws IOException, ExecutionException, InterruptedException { + File srcRoot = mTestFolder.newFolder("src"); + File dstRoot = mTestFolder.newFolder("dst"); + // create test file under mSrcFolder + File a = new File(srcRoot, "a"); + a.mkdirs(); + File c = new File(a, "c"); + c.createNewFile(); + File d = new File(a, "d"); + d.createNewFile(); + File b = new File(dstRoot, "b"); + b.mkdirs(); + File dstC = new File(b, "c"); + File dstD = new File(b, "d"); + dstD.createNewFile(); + int length = 10; + byte[] buffer = BufferUtils.getIncreasingByteArray(length); + BufferUtils.writeBufferToFile(c.getAbsolutePath(), buffer); + List routes = new ArrayList<>(); + Route route = Route.newBuilder().setDst(dstC.getAbsolutePath()).setSrc(c.getAbsolutePath()) + .setLength(length).build(); + Route route2 = + Route.newBuilder().setDst(b.getAbsolutePath()).setSrc(a.getAbsolutePath()).build(); + Route route3 = + Route.newBuilder().setDst(dstD.getAbsolutePath()).setSrc(d.getAbsolutePath()).build(); + routes.add(route); + routes.add(route2); + routes.add(route3); + WriteOptions writeOptions = + WriteOptions.newBuilder().setOverwrite(false).setCheckContent(true).build(); + UfsReadOptions read = + UfsReadOptions.newBuilder().setUser("test").setTag("1").setPositionShort(false).build(); + ListenableFuture> copy = mWorker.copy(routes, read, writeOptions); + List failures = copy.get(); + + assertEquals(1, failures.size()); + assertTrue(failures.get(0).getIsSkip()); + Assert.assertTrue(dstC.exists()); + Assert.assertTrue(b.exists()); + Assert.assertTrue(b.isDirectory()); + Assert.assertTrue(dstD.exists()); + Assert.assertTrue(dstD.isFile()); + try (InputStream in = Files.newInputStream(dstC.toPath())) { + byte[] readBuffer = new byte[length]; + while (in.read(readBuffer) != -1) { + } + Assert.assertArrayEquals(buffer, readBuffer); + } + } + @Test public void testSingleFileMove() throws IOException, ExecutionException, InterruptedException { File srcRoot = mTestFolder.newFolder("src"); @@ -289,6 +341,60 @@ public void testSingleFileMove() throws IOException, ExecutionException, Interru } } + @Test + public void testSingleFileCopySkip() throws IOException, ExecutionException, + InterruptedException { + File srcRoot = mTestFolder.newFolder("src"); + File dstRoot = mTestFolder.newFolder("dst"); + // create test file under mSrcFolder + File a = new File(srcRoot, "a"); + a.createNewFile(); + File b = new File(dstRoot, "b"); + b.createNewFile(); + int length = 10; + byte[] buffer = BufferUtils.getIncreasingByteArray(length); + BufferUtils.writeBufferToFile(a.getAbsolutePath(), buffer); + Route route = + Route.newBuilder().setDst(b.getAbsolutePath()).setSrc(a.getAbsolutePath()) + .setLength(length).build(); + WriteOptions writeOptions = + WriteOptions.newBuilder().setOverwrite(false).setCheckContent(true).build(); + UfsReadOptions read = + UfsReadOptions.newBuilder().setUser("test").setTag("1").setPositionShort(false).build(); + ListenableFuture> copy = + mWorker.copy(Collections.singletonList(route), read, writeOptions); + List failures = copy.get(); + assertEquals(1, failures.size()); + assertTrue(failures.get(0).getIsSkip()); + } + + @Test + public void testSingleFileMopyNoSkip() throws IOException, ExecutionException, + InterruptedException { + File srcRoot = mTestFolder.newFolder("src"); + File dstRoot = mTestFolder.newFolder("dst"); + // create test file under mSrcFolder + File a = new File(srcRoot, "a"); + a.createNewFile(); + File b = new File(dstRoot, "b"); + b.createNewFile(); + int length = 10; + byte[] buffer = BufferUtils.getIncreasingByteArray(length); + BufferUtils.writeBufferToFile(a.getAbsolutePath(), buffer); + Route route = + Route.newBuilder().setDst(b.getAbsolutePath()).setSrc(a.getAbsolutePath()) + .setLength(length).build(); + WriteOptions writeOptions = + WriteOptions.newBuilder().setOverwrite(false).setCheckContent(true).build(); + UfsReadOptions read = + UfsReadOptions.newBuilder().setUser("test").setTag("1").setPositionShort(false).build(); + ListenableFuture> copy = + mWorker.move(Collections.singletonList(route), read, writeOptions); + List failures = copy.get(); + assertEquals(1, failures.size()); + assertFalse(failures.get(0).hasIsSkip()); + } + @Test public void testMoveException() throws IOException, ExecutionException, InterruptedException { File srcRoot = mTestFolder.newFolder("srcException"); @@ -335,18 +441,17 @@ public void testSingleFolderMove() throws IOException, ExecutionException, Inter } @Test - @Ignore public void testFolderWithFileMove() throws IOException, ExecutionException, InterruptedException { File srcRoot = mTestFolder.newFolder("src"); File dstRoot = mTestFolder.newFolder("dst"); // create test file under mSrcFolder File a = new File(srcRoot, "a"); - a.mkdirs(); + assertTrue(a.mkdirs()); File c = new File(a, "c"); - c.createNewFile(); + assertTrue(c.createNewFile()); File d = new File(a, "d"); - d.mkdirs(); + assertTrue(d.mkdirs()); File b = new File(dstRoot, "b"); File dstC = new File(b, "c"); File dstD = new File(b, "d"); @@ -357,18 +462,23 @@ public void testFolderWithFileMove() Route route = Route.newBuilder().setDst(dstC.getAbsolutePath()).setSrc(c.getAbsolutePath()) .setLength(length).build(); Route route2 = - Route.newBuilder().setDst(b.getAbsolutePath()).setSrc(a.getAbsolutePath()).build(); - Route route3 = Route.newBuilder().setDst(dstD.getAbsolutePath()).setSrc(d.getAbsolutePath()).build(); + Route route3 = + Route.newBuilder().setDst(b.getAbsolutePath()).setSrc(a.getAbsolutePath()).build(); routes.add(route); routes.add(route2); - routes.add(route3); WriteOptions writeOptions = - WriteOptions.newBuilder().setOverwrite(false).setCheckContent(true).build(); + WriteOptions.newBuilder().setOverwrite(false).setCheckContent(true) + .setWriteType(WritePType.CACHE_THROUGH).build(); UfsReadOptions read = UfsReadOptions.newBuilder().setUser("test").setTag("1").setPositionShort(false).build(); ListenableFuture> move = mWorker.move(routes, read, writeOptions); List failures = move.get(); + assertEquals(0, failures.size()); + List routes2 = new ArrayList<>(); + routes2.add(route3); + ListenableFuture> move2 = mWorker.move(routes2, read, writeOptions); + failures = move2.get(); assertEquals(0, failures.size()); Assert.assertTrue(dstC.exists());