Skip to content

Commit

Permalink
Skip copy if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
ssyssy committed Aug 10, 2023
1 parent d6e3ea1 commit 63dc5b0
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 91 deletions.
1 change: 1 addition & 0 deletions common/transport/src/main/proto/grpc/block_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ message RouteFailure {
// A developer-facing error message
optional string message = 3;
optional bool retryable = 4;
optional bool is_skip = 5;
}

// next available id: 2
Expand Down
6 changes: 6 additions & 0 deletions dora/core/common/src/main/java/alluxio/metrics/MetricKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,12 @@ public static String getSyncMetricName(long mountId) {
.setDescription("The number of files failed to be copied by copy commands")
.setMetricType(MetricType.COUNTER)
.build();

public static final MetricKey MASTER_JOB_COPY_SKIP_FILE_COUNT =
new Builder("Master.JobCopySkipFileCount")
.setDescription("The number of files skipped to be copied by copy commands")
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey MASTER_JOB_COPY_SIZE =
new Builder("Master.JobCopyFileSize")
.setDescription("The total block size copied by copy commands")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import alluxio.scheduler.job.JobState;
import alluxio.scheduler.job.Task;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,7 +38,7 @@ public abstract class AbstractJob<T extends Task<?>> implements Job<T> {
protected final AtomicInteger mTaskIdGenerator = new AtomicInteger(0);
protected JobState mState; // TODO(lucy) make it thread safe state update
protected OptionalLong mEndTime = OptionalLong.empty();
protected final long mStartTime;
protected long mStartTime;
protected final Optional<String> mUser;
// not making it thread safe as currently scheduler has been single-threaded
protected final LinkedHashSet<T> mRetryTaskList = new LinkedHashSet<>();
Expand Down Expand Up @@ -107,6 +108,17 @@ public void setEndTime(long time) {
mEndTime = OptionalLong.of(time);
}

/**
* Update start time.
* This is for internal tests.
*
* @param time time in ms
*/
@VisibleForTesting
public void setStartTime(long time) {
mStartTime = time;
}

/**
* Get load status.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class CopyJob extends AbstractJob<CopyJob.CopyTask> {
private final AtomicLong mTotalByteCount = new AtomicLong();
private final AtomicLong mTotalFailureCount = new AtomicLong();
private final AtomicLong mCurrentFailureCount = new AtomicLong();
private final AtomicLong mCurrentSkipCount = new AtomicLong();
private Optional<AlluxioRuntimeException> mFailedReason = Optional.empty();
private final Iterable<FileInfo> mFileIterable;
private Optional<Iterator<FileInfo>> mFileIterator = Optional.empty();
Expand Down Expand Up @@ -377,6 +379,16 @@ public void addFailure(String src, String message, int code) {
COPY_FAIL_FILE_COUNT.inc();
}

/**
* Add a skip to metrics.
*
*/
@VisibleForTesting
public void addSkip() {
mCurrentSkipCount.incrementAndGet();
COPY_SKIP_FILE_COUNT.inc();
}

private Route buildRoute(FileInfo sourceFile) {
String relativePath;
try {
Expand Down Expand Up @@ -465,9 +477,12 @@ public boolean processResponse(CopyTask task) {
for (RouteFailure status : response.getFailuresList()) {
totalBytes -= status.getRoute().getLength();
if (!isHealthy() || !status.getRetryable() || !addToRetry(
status.getRoute())) {
status.getRoute()) || !status.getIsSkip()) {
addFailure(status.getRoute().getSrc(), status.getMessage(), status.getCode());
}
if (status.getIsSkip()) {
addSkip();
}
}
}
addCopiedBytes(totalBytes);
Expand Down Expand Up @@ -576,11 +591,16 @@ private static class CopyProgressReport {
private final double mFailurePercentage;
private final AlluxioRuntimeException mFailureReason;
private final long mFailedFileCount;
private final long mSkippedFileCount;
private final Map<String, String> mFailedFilesWithReasons;
private final String mJobId;
private final long mStartTime;
private final long mEndTime;

public CopyProgressReport(CopyJob job, boolean verbose)
{
mVerbose = verbose;
mJobId = job.mJobId;
mJobState = job.mState;
mCheckContent = job.mCheckContent;
mProcessedFileCount = job.mProcessedFileCount.get();
Expand Down Expand Up @@ -609,11 +629,23 @@ public CopyProgressReport(CopyJob job, boolean verbose)
}
mFailureReason = job.mFailedReason.orElse(null);
mFailedFileCount = job.mFailedFiles.size();
mSkippedFileCount = job.mCurrentSkipCount.get();
if (verbose && mFailedFileCount > 0) {
mFailedFilesWithReasons = job.mFailedFiles;
} else {
mFailedFilesWithReasons = Collections.emptyMap();
}
mStartTime = job.mStartTime;
if (mJobState == JobState.SUCCEEDED || mJobState == JobState.FAILED) {
if (job.mEndTime.isPresent()) {
mEndTime = job.mEndTime.getAsLong();
} else {
throw new InternalRuntimeException(
String.format("No end time in ending state %s", mJobState));
}
} else {
mEndTime = 0;
}
}

public String getReport(JobProgressReportFormat format)
Expand All @@ -632,29 +664,44 @@ public String getReport(JobProgressReportFormat format)
private String getTextReport() {
StringBuilder progress = new StringBuilder();
progress.append(
format("\tSettings:\tcheck-content: %s%n", mCheckContent));
progress.append(format("\tJob State: %s%s%n", mJobState,
mFailureReason == null
? "" : format(
" (%s: %s)",
mFailureReason.getClass().getName(),
mFailureReason.getMessage())));
format("\tSettings: \"check-content: %s\"%n", mCheckContent));
progress.append(format("\tJob Submitted: %s%n", new Date(mStartTime)));
progress.append(format("\tJob Id: %s%n", mJobId));
if (mJobState == JobState.SUCCEEDED || mJobState == JobState.FAILED) {
progress.append(format("\tJob State: %s%s, finished at %s%n", mJobState,
mFailureReason == null
? "" : format(
" (%s: %s)",
mFailureReason.getClass().getName(),
mFailureReason.getMessage()),
new Date(mEndTime)));
} else {
progress.append(format("\tJob State: %s%s%n", mJobState,
mFailureReason == null
? "" : format(
" (%s: %s)",
mFailureReason.getClass().getName(),
mFailureReason.getMessage())));
}
if (mVerbose && mFailureReason != null) {
for (StackTraceElement stack : mFailureReason.getStackTrace()) {
progress.append(format("\t\t%s%n", stack.toString()));
}
}
progress.append(format("\tFiles Processed: %d%n", mProcessedFileCount));
progress.append(format("\tBytes Copied: %s%s%n",
FormatUtils.getSizeFromBytes(mByteCount),
progress.append(format("\tFiles qualified%s: %d%s%n",
mJobState == JobState.RUNNING ? " so far" : "", mProcessedFileCount,
mTotalByteCount == null
? "" : format(" out of %s", FormatUtils.getSizeFromBytes(mTotalByteCount))));
? "" : format(", %s", FormatUtils.getSizeFromBytes(mTotalByteCount))));
progress.append(format("\tFiles Failed: %s%n", mFailedFileCount));
progress.append(format("\tFiles Skipped: %s%n", mSkippedFileCount));
progress.append(format("\tFiles Succeeded: %s%n", mProcessedFileCount
- mFailedFileCount - mSkippedFileCount));
progress.append(format("\tBytes Copied: %s%n", FormatUtils.getSizeFromBytes(mByteCount)));
if (mThroughput != null) {
progress.append(format("\tThroughput: %s/s%n",
FormatUtils.getSizeFromBytes(mThroughput)));
}
progress.append(format("\tFiles failure rate: %.2f%%%n", mFailurePercentage));
progress.append(format("\tFiles Failed: %s%n", mFailedFileCount));
if (mVerbose && !mFailedFilesWithReasons.isEmpty()) {
mFailedFilesWithReasons.forEach((fileName, reason) ->
progress.append(format("\t\t%s: %s%n", fileName, reason)));
Expand Down Expand Up @@ -683,6 +730,8 @@ private String getJsonReport() {
MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_FILE_COUNT.getName());
public static final Counter COPY_FAIL_FILE_COUNT =
MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_FAIL_FILE_COUNT.getName());
public static final Counter COPY_SKIP_FILE_COUNT =
MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_SKIP_FILE_COUNT.getName());
public static final Counter COPY_SIZE =
MetricsSystem.counter(MetricKey.MASTER_JOB_COPY_SIZE.getName());
public static final Meter COPY_RATE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -578,6 +579,8 @@ private static class MoveProgressReport {
private final long mFailedFileCount;
private final Map<String, String> mFailedFilesWithReasons;
private final String mJobId;
private final long mStartTime;
private final long mEndTime;

public MoveProgressReport(MoveJob job, boolean verbose)
{
Expand Down Expand Up @@ -616,6 +619,17 @@ public MoveProgressReport(MoveJob job, boolean verbose)
} else {
mFailedFilesWithReasons = Collections.emptyMap();
}
mStartTime = job.mStartTime;
if (mJobState == JobState.SUCCEEDED || mJobState == JobState.FAILED) {
if (job.mEndTime.isPresent()) {
mEndTime = job.mEndTime.getAsLong();
} else {
throw new InternalRuntimeException(
String.format("No end time in ending state %s", mJobState));
}
} else {
mEndTime = 0;
}
}

public String getReport(JobProgressReportFormat format)
Expand All @@ -634,30 +648,42 @@ public String getReport(JobProgressReportFormat format)
private String getTextReport() {
StringBuilder progress = new StringBuilder();
progress.append(
format("\tSettings:\tcheck-content: %s%n", mCheckContent));
format("\tSettings: \"check-content: %s\"%n", mCheckContent));
progress.append(format("\tJob Submitted: %s%n", new Date(mStartTime)));
progress.append(format("\tJob Id: %s%n", mJobId));
progress.append(format("\tJob State: %s%s%n", mJobState,
mFailureReason == null
? "" : format(
" (%s: %s)",
mFailureReason.getClass().getName(),
mFailureReason.getMessage())));
if (mJobState == JobState.SUCCEEDED || mJobState == JobState.FAILED) {
progress.append(format("\tJob State: %s%s, finished at %s%n", mJobState,
mFailureReason == null
? "" : format(
" (%s: %s)",
mFailureReason.getClass().getName(),
mFailureReason.getMessage()),
new Date(mEndTime)));
} else {
progress.append(format("\tJob State: %s%s%n", mJobState,
mFailureReason == null
? "" : format(
" (%s: %s)",
mFailureReason.getClass().getName(),
mFailureReason.getMessage())));
}
if (mVerbose && mFailureReason != null) {
for (StackTraceElement stack : mFailureReason.getStackTrace()) {
progress.append(format("\t\t%s%n", stack.toString()));
}
}
progress.append(format("\tFiles Processed: %d%n", mProcessedFileCount));
progress.append(format("\tBytes Moved: %s%s%n",
FormatUtils.getSizeFromBytes(mByteCount),
progress.append(format("\tFiles qualified%s: %d%s%n",
mJobState == JobState.RUNNING ? " so far" : "", mProcessedFileCount,
mTotalByteCount == null
? "" : format(" out of %s", FormatUtils.getSizeFromBytes(mTotalByteCount))));
? "" : format(", %s", FormatUtils.getSizeFromBytes(mTotalByteCount))));
progress.append(format("\tFiles Failed: %s%n", mFailedFileCount));
progress.append(format("\tFiles Succeeded: %s%n", mProcessedFileCount - mFailedFileCount));
progress.append(format("\tBytes Moved: %s%n", FormatUtils.getSizeFromBytes(mByteCount)));
if (mThroughput != null) {
progress.append(format("\tThroughput: %s/s%n",
FormatUtils.getSizeFromBytes(mThroughput)));
}
progress.append(format("\tFiles failure rate: %.2f%%%n", mFailurePercentage));
progress.append(format("\tFiles Failed: %s%n", mFailedFileCount));
if (mVerbose && !mFailedFilesWithReasons.isEmpty()) {
mFailedFilesWithReasons.forEach((fileName, reason) ->
progress.append(format("\t\t%s: %s%n", fileName, reason)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,32 +119,45 @@ public void testProgressReport() throws Exception {
OptionalLong.empty(), false, false, false, files, Optional.empty()));
when(job.getDurationInSec()).thenReturn(0L);
job.setJobState(JobState.RUNNING, false);
job.setStartTime(1690000000000L);
List<Route> nextRoutes = job.getNextRoutes(25);
job.addCopiedBytes(640 * Constants.MB);
String expectedTextReport = "\tSettings:\tcheck-content: false\n"
String expectedTextReport = "\tSettings: \"check-content: false\"\n"
+ "\tJob Submitted: Sat Jul 22 04:26:40.0 UTC 2023\n"
+ "\tJob Id: 1\n"
+ "\tJob State: RUNNING\n"
+ "\tFiles Processed: 25\n"
+ "\tBytes Copied: 640.00MB out of 31.25GB\n"
+ "\tFiles failure rate: 0.00%\n"
+ "\tFiles Failed: 0\n";
+ "\tFiles qualified so far: 25, 31.25GB\n"
+ "\tFiles Failed: 0\n"
+ "\tFiles Skipped: 0\n"
+ "\tFiles Succeeded: 25\n"
+ "\tBytes Copied: 640.00MB\n"
+ "\tFiles failure rate: 0.00%\n";
assertEquals(expectedTextReport, job.getProgress(JobProgressReportFormat.TEXT, false));
assertEquals(expectedTextReport, job.getProgress(JobProgressReportFormat.TEXT, true));
String expectedJsonReport = "{\"mVerbose\":false,\"mJobState\":\"RUNNING\","
+ "\"mCheckContent\":false,\"mProcessedFileCount\":25,"
+ "\"mByteCount\":671088640,\"mTotalByteCount\":33554432000,"
+ "\"mFailurePercentage\":0.0,\"mFailedFileCount\":0,\"mFailedFilesWithReasons\":{}}";
+ "\"mFailurePercentage\":0.0,\"mFailedFileCount\":0,\"mSkippedFileCount\":0,"
+ "\"mFailedFilesWithReasons\":{},\"mJobId\":\"1\",\"mStartTime\":1690000000000,"
+ "\"mEndTime\":0}";
assertEquals(expectedJsonReport, job.getProgress(JobProgressReportFormat.JSON, false));
job.addFailure(nextRoutes.get(0).getSrc(), "Test error 1", 2);
job.addFailure(nextRoutes.get(4).getSrc(), "Test error 2", 2);
job.addFailure(nextRoutes.get(10).getSrc(), "Test error 3", 2);
job.failJob(new InternalRuntimeException("test"));
job.setEndTime(1700000000000L);
assertEquals(JobState.FAILED, job.getJobState());
String expectedTextReportWithError = "\tSettings:\tcheck-content: false\n"
+ "\tJob State: FAILED (alluxio.exception.runtime.InternalRuntimeException: test)\n"
+ "\tFiles Processed: 25\n"
+ "\tBytes Copied: 640.00MB out of 31.25GB\n"
+ "\tFiles failure rate: 12.00%\n"
+ "\tFiles Failed: 3\n";
String expectedTextReportWithError = "\tSettings: \"check-content: false\"\n"
+ "\tJob Submitted: Sat Jul 22 04:26:40.0 UTC 2023\n"
+ "\tJob Id: 1\n"
+ "\tJob State: FAILED (alluxio.exception.runtime.InternalRuntimeException: test), "
+ "finished at Tue Nov 14 22:13:20 UTC 2023\n"
+ "\tFiles qualified: 25, 31.25GB\n"
+ "\tFiles Failed: 3\n"
+ "\tFiles Skipped: 0\n"
+ "\tFiles Succeeded: 22\n"
+ "\tBytes Copied: 640.00MB\n"
+ "\tFiles failure rate: 12.00%\n";
assertEquals(expectedTextReportWithError,
job.getProgress(JobProgressReportFormat.TEXT, false));
String textReport = job.getProgress(JobProgressReportFormat.TEXT, true);
Expand Down
Loading

0 comments on commit 63dc5b0

Please sign in to comment.