Skip to content

Commit

Permalink
Improve the random read behavior in StressWorkerBench
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Improve the randomness of StressWorkerBench random read test, now each thread throw dice every time it trying to do a random read.

### Why are the changes needed?

In previous StressWorkerBench random read test each thread read from same offset and same length everytime, this cause low randomness.

### Does this PR introduce any user facing changes?

no
			pr-link: #18000
			change-id: cid-974afb58c022061e8a6d8d6894415a8bd292b764
  • Loading branch information
voddle authored Sep 22, 2023
1 parent 4e2a494 commit be6974c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ public final class WorkerBenchParameters extends FileSystemParameters {
public Integer mRandomSeed = 1;

@Parameter(names = {"--random-max-length"},
description = "The random max length upper bound")
description = "The random max length upper bound."
+ "As this InputStream.read() only accept read offset and"
+ " length as Integer, so this max length must smaller than 2.1GB.")
public String mRandomMaxReadLength = "4m";

@Parameter(names = {"--random-min-length"},
description = "The random max length upper bound")
description = "The random max length lower bound."
+ "this random min length must not larger than random max length.")
public String mRandomMinReadLength = "1m";

@Parameter(names = {"--free"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -68,13 +68,17 @@ public class StressWorkerBench extends AbstractStressBench<WorkerBenchTaskResult

private FileSystem[] mCachedFs;
private Path[] mFilePaths;
private Integer[] mOffsets;
private Integer[] mLengths;
private FileSystemContext mFsContext;

/** generate random number in range [min, max] (include both min and max).*/
private Integer randomNumInRange(Random rand, int min, int max) {
return rand.nextInt(max - min + 1) + min;
/**
* generate random number in range [min, max] (include both min and max).
*/
private long randomNumInRange(long min, long max) {
return ThreadLocalRandom.current().nextLong(min, max + 1) + min;
}

private long minLong(long a, long b) {
return a > b ? a : b;
}

/**
Expand Down Expand Up @@ -154,8 +158,6 @@ public void prepare() throws Exception {
// and offsets
mFilePaths = new Path[numFiles];
// set random offsets and lengths if enabled
mLengths = new Integer[numFiles];
mOffsets = new Integer[numFiles];

generateTestFilePaths(basePath);

Expand Down Expand Up @@ -205,16 +207,10 @@ public void prepare() throws Exception {
* @param basePath base dir where the files should be prepared
*/
public void generateTestFilePaths(Path basePath) throws IOException {
int fileSize = (int) FormatUtils.parseSpaceSize(mParameters.mFileSize);
int clusterSize = mBaseParameters.mClusterLimit;
int threads = mParameters.mThreads;
List<BlockWorkerInfo> workers = mFsContext.getCachedWorkers();

Random rand = new Random();
if (mParameters.mIsRandom) {
rand = new Random(mParameters.mRandomSeed);
}

for (int i = 0; i < clusterSize; i++) {
BlockWorkerInfo localWorker = workers.get(i);
LOG.info("Building file paths for worker {}", localWorker);
Expand All @@ -223,19 +219,6 @@ public void generateTestFilePaths(Path basePath) throws IOException {

int index = i * threads + j;
mFilePaths[index] = filePath;

// Continue init other aspects of the file read operation
// TODO(jiacheng): do we want a new randomness for every read?
if (mParameters.mIsRandom) {
int randomMin = (int) FormatUtils.parseSpaceSize(mParameters.mRandomMinReadLength);
int randomMax = (int) FormatUtils.parseSpaceSize(mParameters.mRandomMaxReadLength);
mOffsets[index] = randomNumInRange(rand, 0, fileSize - 1 - randomMin);
mLengths[index] = randomNumInRange(rand, randomMin,
Integer.min(fileSize - mOffsets[i], randomMax));
} else {
mOffsets[index] = 0;
mLengths[index] = fileSize;
}
}
}
LOG.info("{} file paths generated", mFilePaths.length);
Expand Down Expand Up @@ -357,6 +340,16 @@ public void validateParams() throws Exception {
throw new IllegalStateException(String.format("%s cannot be %s when %s option provided",
FileSystemParameters.WRITE_TYPE_OPTION_NAME, WritePType.MUST_CACHE, "--free"));
}

if (FormatUtils.parseSpaceSize(mParameters.mRandomMaxReadLength) > Integer.MAX_VALUE) {
throw new IllegalArgumentException("mRandomReadMaxLength cannot be larger than 2.1G");
}

if (FormatUtils.parseSpaceSize(mParameters.mRandomMaxReadLength)
< FormatUtils.parseSpaceSize(mParameters.mRandomMinReadLength)) {
throw new IllegalArgumentException("mRandomReadMinLength must not larger"
+ " than mRandomReadMaxLength");
}
}

private static final class BenchContext {
Expand Down Expand Up @@ -403,6 +396,9 @@ private final class BenchThread implements Callable<Void> {
private final byte[] mBuffer;
private final WorkerBenchTaskResult mResult;
private final boolean mIsRandomRead;
private final long mRandomMax;
private final long mRandomMin;
private final long mFileSize;

private FSDataInputStream mInStream;

Expand All @@ -416,6 +412,9 @@ private BenchThread(BenchContext context, int targetFileIndex, FileSystem fs) {
mResult.setParameters(mParameters);
mResult.setBaseParameters(mBaseParameters);
mIsRandomRead = mParameters.mIsRandom;
mRandomMin = FormatUtils.parseSpaceSize(mParameters.mRandomMinReadLength);
mRandomMax = FormatUtils.parseSpaceSize(mParameters.mRandomMaxReadLength);
mFileSize = FormatUtils.parseSpaceSize(mParameters.mFileSize);
}

@Override
Expand Down Expand Up @@ -521,18 +520,19 @@ private void runInternal() throws Exception {
*/
private long applyOperation() throws IOException {
Path filePath = mFilePaths[mTargetFileIndex];
int offset = mOffsets[mTargetFileIndex];
int length = mLengths[mTargetFileIndex];

if (mInStream == null) {
mInStream = mFs.open(filePath);
}

int bytesRead = 0;
if (mIsRandomRead) {
long offset = randomNumInRange(0, mFileSize - 1 - mRandomMin);
long lengthMax = Math.min(mFileSize - offset, mRandomMax);
long length = randomNumInRange(mRandomMin, lengthMax);
while (length > 0) {
int actualReadLength = mInStream
.read(offset, mBuffer, 0, mBuffer.length);
.read(offset, mBuffer, 0, (int) minLong(mBuffer.length, length));
if (actualReadLength < 0) {
closeInStream();
break;
Expand Down

0 comments on commit be6974c

Please sign in to comment.