diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java index 3879eb831014..2ada4c6569d9 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java @@ -71,11 +71,14 @@ public final class WorkerBenchParameters extends FileSystemParameters { public Integer mRandomSeed = 1; @Parameter(names = {"--random-max-length"}, - description = "The random max length upper bound") + description = "The random max length upper bound." + + "As this InputStream.read() only accept read offset and" + + " length as Integer, so this max length must smaller than 2.1GB.") public String mRandomMaxReadLength = "4m"; @Parameter(names = {"--random-min-length"}, - description = "The random max length upper bound") + description = "The random max length lower bound." + + "this random min length must not larger than random max length.") public String mRandomMinReadLength = "1m"; @Parameter(names = {"--free"}, diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java index daeea146dfa4..24837082a2db 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java @@ -47,9 +47,9 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** @@ -68,13 +68,17 @@ public class StressWorkerBench extends AbstractStressBench b ? a : b; } /** @@ -154,8 +158,6 @@ public void prepare() throws Exception { // and offsets mFilePaths = new Path[numFiles]; // set random offsets and lengths if enabled - mLengths = new Integer[numFiles]; - mOffsets = new Integer[numFiles]; generateTestFilePaths(basePath); @@ -205,16 +207,10 @@ public void prepare() throws Exception { * @param basePath base dir where the files should be prepared */ public void generateTestFilePaths(Path basePath) throws IOException { - int fileSize = (int) FormatUtils.parseSpaceSize(mParameters.mFileSize); int clusterSize = mBaseParameters.mClusterLimit; int threads = mParameters.mThreads; List workers = mFsContext.getCachedWorkers(); - Random rand = new Random(); - if (mParameters.mIsRandom) { - rand = new Random(mParameters.mRandomSeed); - } - for (int i = 0; i < clusterSize; i++) { BlockWorkerInfo localWorker = workers.get(i); LOG.info("Building file paths for worker {}", localWorker); @@ -223,19 +219,6 @@ public void generateTestFilePaths(Path basePath) throws IOException { int index = i * threads + j; mFilePaths[index] = filePath; - - // Continue init other aspects of the file read operation - // TODO(jiacheng): do we want a new randomness for every read? - if (mParameters.mIsRandom) { - int randomMin = (int) FormatUtils.parseSpaceSize(mParameters.mRandomMinReadLength); - int randomMax = (int) FormatUtils.parseSpaceSize(mParameters.mRandomMaxReadLength); - mOffsets[index] = randomNumInRange(rand, 0, fileSize - 1 - randomMin); - mLengths[index] = randomNumInRange(rand, randomMin, - Integer.min(fileSize - mOffsets[i], randomMax)); - } else { - mOffsets[index] = 0; - mLengths[index] = fileSize; - } } } LOG.info("{} file paths generated", mFilePaths.length); @@ -357,6 +340,16 @@ public void validateParams() throws Exception { throw new IllegalStateException(String.format("%s cannot be %s when %s option provided", FileSystemParameters.WRITE_TYPE_OPTION_NAME, WritePType.MUST_CACHE, "--free")); } + + if (FormatUtils.parseSpaceSize(mParameters.mRandomMaxReadLength) > Integer.MAX_VALUE) { + throw new IllegalArgumentException("mRandomReadMaxLength cannot be larger than 2.1G"); + } + + if (FormatUtils.parseSpaceSize(mParameters.mRandomMaxReadLength) + < FormatUtils.parseSpaceSize(mParameters.mRandomMinReadLength)) { + throw new IllegalArgumentException("mRandomReadMinLength must not larger" + + " than mRandomReadMaxLength"); + } } private static final class BenchContext { @@ -403,6 +396,9 @@ private final class BenchThread implements Callable { private final byte[] mBuffer; private final WorkerBenchTaskResult mResult; private final boolean mIsRandomRead; + private final long mRandomMax; + private final long mRandomMin; + private final long mFileSize; private FSDataInputStream mInStream; @@ -416,6 +412,9 @@ private BenchThread(BenchContext context, int targetFileIndex, FileSystem fs) { mResult.setParameters(mParameters); mResult.setBaseParameters(mBaseParameters); mIsRandomRead = mParameters.mIsRandom; + mRandomMin = FormatUtils.parseSpaceSize(mParameters.mRandomMinReadLength); + mRandomMax = FormatUtils.parseSpaceSize(mParameters.mRandomMaxReadLength); + mFileSize = FormatUtils.parseSpaceSize(mParameters.mFileSize); } @Override @@ -521,8 +520,6 @@ private void runInternal() throws Exception { */ private long applyOperation() throws IOException { Path filePath = mFilePaths[mTargetFileIndex]; - int offset = mOffsets[mTargetFileIndex]; - int length = mLengths[mTargetFileIndex]; if (mInStream == null) { mInStream = mFs.open(filePath); @@ -530,9 +527,12 @@ private long applyOperation() throws IOException { int bytesRead = 0; if (mIsRandomRead) { + long offset = randomNumInRange(0, mFileSize - 1 - mRandomMin); + long lengthMax = Math.min(mFileSize - offset, mRandomMax); + long length = randomNumInRange(mRandomMin, lengthMax); while (length > 0) { int actualReadLength = mInStream - .read(offset, mBuffer, 0, mBuffer.length); + .read(offset, mBuffer, 0, (int) minLong(mBuffer.length, length)); if (actualReadLength < 0) { closeInStream(); break;