diff --git a/dora/job/server/src/main/java/alluxio/job/plan/stress/StressBenchDefinition.java b/dora/job/server/src/main/java/alluxio/job/plan/stress/StressBenchDefinition.java index 38796a45c537..2c45ccf68ba7 100644 --- a/dora/job/server/src/main/java/alluxio/job/plan/stress/StressBenchDefinition.java +++ b/dora/job/server/src/main/java/alluxio/job/plan/stress/StressBenchDefinition.java @@ -127,8 +127,10 @@ public String runTask(StressBenchConfig config, ArrayList args, RunTaskContext runTaskContext) throws Exception { List command = new ArrayList<>(3 + config.getArgs().size()); command.add(Configuration.get(PropertyKey.HOME) + "/bin/alluxio"); - command.add("runClass"); + command.add("exec"); + command.add("class"); command.add(config.getClassName()); + command.add("--"); // the cluster will run distributed tasks command.add(BaseParameters.DISTRIBUTED_FLAG); diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPoint.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPoint.java new file mode 100644 index 000000000000..e5088fd997a4 --- /dev/null +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPoint.java @@ -0,0 +1,53 @@ +package alluxio.stress.worker; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + +import java.util.List; + +@JsonDeserialize(using = WorkerBenchCoarseDataPointDeserializer.class) +public class WorkerBenchCoarseDataPoint { + // properties: workerId, threadId, sliceId, records + @JsonProperty("wid") + private Long mWid; + @JsonProperty("tid") + private Long mTid; + @JsonProperty("data") + private List> mData; + + // constructor + public WorkerBenchCoarseDataPoint(Long workerID, Long threadID, List> data) { + mWid = workerID; + mTid = threadID; + mData = data; + } + + // getter & setters + public Long getWid() { + return mWid; + } + + public void setWid(Long wid) { + mWid = wid; + } + + public Long getTid() { + return mTid; + } + + public void setTid(Long tid) { + mTid = tid; + } + + public List> getData() { + return mData; + } + + public void setData(List> data) { + mData = data; + } + + public void addDataPoints(List data) { + mData.add(data); + } +} diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPointDeserializer.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPointDeserializer.java new file mode 100644 index 000000000000..abc0f67b2675 --- /dev/null +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchCoarseDataPointDeserializer.java @@ -0,0 +1,38 @@ +package alluxio.stress.worker; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A deserializer converting {@link WorkerBenchCoarseDataPoint} from JSON. + */ +public class WorkerBenchCoarseDataPointDeserializer extends JsonDeserializer { + @Override + public WorkerBenchCoarseDataPoint deserialize(JsonParser parser, DeserializationContext ctx) + throws IOException { + ObjectMapper mapper = (ObjectMapper) parser.getCodec(); + JsonNode node = parser.getCodec().readTree(parser); + Long wId = node.get("wid").asLong(); + Long tId = node.get("tid").asLong(); + List> data = new ArrayList<>(); + JsonNode dataNode = node.get("data"); + if (dataNode != null) { + for (JsonNode listNode: dataNode){ + List dataPoints = new ArrayList<>(); + for (JsonNode subListNode: listNode) { + WorkerBenchDataPoint dataPoint = mapper.treeToValue(subListNode, WorkerBenchDataPoint.class); + dataPoints.add(dataPoint); + } + data.add(dataPoints); + } + } + return new WorkerBenchCoarseDataPoint(wId, tId, data); + } +} \ No newline at end of file diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPoint.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPoint.java index 9109e73f06ed..409203d3399b 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPoint.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPoint.java @@ -14,7 +14,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.google.common.base.MoreObjects; /** * One data point captures the information we collect from one I/O operation to a worker. @@ -22,52 +21,25 @@ */ @JsonDeserialize(using = WorkerBenchDataPointDeserializer.class) public class WorkerBenchDataPoint { - @JsonProperty("workerID") - public String mWorkerID; - @JsonProperty("threadID") - public long mThreadID; - @JsonProperty("duration") public long mDuration; - @JsonProperty("start") + @JsonProperty("startMs") public long mStartMs; @JsonProperty("ioBytes") public long mIOBytes; /** - * @param workerID the worker this I/O operation reads - * @param threadID the thread performing the I/O * @param startMs start timestamp of the I/O * @param duration duration of the file read operation * @param ioBytes bytes read */ @JsonCreator - public WorkerBenchDataPoint(@JsonProperty("workerID") String workerID, - @JsonProperty("threadID") long threadID, - @JsonProperty("start") long startMs, - @JsonProperty("duration") long duration, - @JsonProperty("ioBytes") long ioBytes) { - mWorkerID = workerID; - mThreadID = threadID; + public WorkerBenchDataPoint(long startMs, long duration, long ioBytes) { mStartMs = startMs; mDuration = duration; mIOBytes = ioBytes; } - /** - * @return worker ID - */ - public String getWorkerID() { - return mWorkerID; - } - - /** - * @return thread ID - */ - public long getThreadID() { - return mThreadID; - } - /** * @return duration in ms */ @@ -89,20 +61,6 @@ public long getIOBytes() { return mIOBytes; } - /** - * @param workerID worker ID - */ - public void setWorkerID(String workerID) { - mWorkerID = workerID; - } - - /** - * @param threadID the thread ID - */ - public void setThreadID(long threadID) { - mThreadID = threadID; - } - /** * @param duration duration in ms */ @@ -123,13 +81,4 @@ public void setStartMs(long startMs) { public void setIOBytes(long ioBytes) { mIOBytes = ioBytes; } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("threadID", mThreadID) - .add("ioBytes", mIOBytes) - .add("duration", mDuration) - .toString(); - } } diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPointDeserializer.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPointDeserializer.java index 02859bff36b3..34adc9a0fceb 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPointDeserializer.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchDataPointDeserializer.java @@ -27,9 +27,7 @@ public WorkerBenchDataPoint deserialize(JsonParser parser, DeserializationContex throws IOException { JsonNode node = parser.getCodec().readTree(parser); return new WorkerBenchDataPoint( - node.get("workerID").asText(), node.get("threadID").asLong(), - node.get("startMs").asLong(), node.get("duration").asLong(), - node.get("ioBytes").asLong() + node.get("startMs").asLong(), node.get("duration").asLong(), node.get("ioBytes").asLong() ); } } diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchSummary.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchSummary.java index 3194c2ae8a36..e5201a886449 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchSummary.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchSummary.java @@ -14,15 +14,12 @@ import alluxio.Constants; import alluxio.collections.Pair; import alluxio.stress.Parameters; -import alluxio.stress.StressConstants; import alluxio.stress.Summary; import alluxio.stress.common.GeneralBenchSummary; import alluxio.stress.graph.Graph; import alluxio.stress.graph.LineGraph; -import alluxio.util.FormatUtils; import com.google.common.base.Splitter; -import org.HdrHistogram.Histogram; import java.util.ArrayList; import java.util.HashMap; @@ -39,7 +36,7 @@ public final class WorkerBenchSummary extends GeneralBenchSummary mDurationPercentiles; + private List mThroughputPercentiles; /** * Creates an instance. @@ -47,7 +44,7 @@ public final class WorkerBenchSummary extends GeneralBenchSummary(); - mDurationPercentiles = new ArrayList<>(); + mThroughputPercentiles = new ArrayList<>(); } /** @@ -65,16 +62,17 @@ public WorkerBenchSummary(WorkerBenchTaskResult mergedTaskResults, mNodeResults = nodes; mThroughput = getIOMBps(); - mDurationPercentiles = new ArrayList<>(); - Histogram durationHistogram = new Histogram( - FormatUtils.parseTimeSize(mParameters.mDuration) - + FormatUtils.parseTimeSize(mParameters.mWarmup), - StressConstants.TIME_HISTOGRAM_PRECISION); - mergedTaskResults.getDataPoints().forEach(stat -> - durationHistogram.recordValue(stat.getDuration())); - for (int i = 0; i <= 100; i++) { - mDurationPercentiles.add(durationHistogram.getValueAtPercentile(i)); - } +// TODO: implement percentile calculation for coarse data points + +// mThroughputPercentiles = new ArrayList<>(); +// Histogram throughputHistogram = new Histogram( +// FormatUtils.parseSpaceSize(mParameters.mFileSize), +// StressConstants.TIME_HISTOGRAM_PRECISION); +// mergedTaskResults.getDataPoints().forEach(stat -> +// throughputHistogram.recordValue(stat.getInThroughput())); +// for (int i = 0; i <= 100; i++) { +// mThroughputPercentiles.add(throughputHistogram.getValueAtPercentile(i)); +// } } /** @@ -150,15 +148,15 @@ public void setIOBytes(long IOBytes) { /** * @return 0~100 percentiles of recorded durations */ - public List getDurationPercentiles() { - return mDurationPercentiles; + public List getThroughputPercentiles() { + return mThroughputPercentiles; } /** * @param percentiles a list of calculated percentiles from recorded durations */ - public void setDurationPercentiles(List percentiles) { - mDurationPercentiles = percentiles; + public void setThroughputPercentiles(List percentiles) { + mThroughputPercentiles = percentiles; } @Override @@ -185,7 +183,7 @@ public List generate(List results) { Pair, List> fieldNames = Parameters.partitionFieldNames( summaries.stream().map(x -> x.mParameters).collect(Collectors.toList())); - // Split up common description into 100 character chunks, for the sub title + // Split up common description into 100 character chunks, for the subtitle List subTitle = new ArrayList<>(Splitter.fixedLength(100).splitToList( summaries.get(0).mParameters.getDescription(fieldNames.getFirst()))); diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchTaskResult.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchTaskResult.java index ee00f4b4843a..936728869111 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchTaskResult.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchTaskResult.java @@ -12,11 +12,8 @@ package alluxio.stress.worker; import alluxio.stress.BaseParameters; -import alluxio.stress.StressConstants; import alluxio.stress.TaskResult; -import alluxio.util.FormatUtils; -import org.HdrHistogram.Histogram; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +36,8 @@ public final class WorkerBenchTaskResult implements TaskResult { private long mEndMs; private long mIOBytes; private List mErrors; - private List mDataPoints; - private List mDurationPercentiles; + private final List mDataPoints; + private List mThroughputPercentiles; /** * Creates an instance. @@ -49,7 +46,7 @@ public WorkerBenchTaskResult() { // Default constructor required for json deserialization mErrors = new ArrayList<>(); mDataPoints = new ArrayList<>(); - mDurationPercentiles = new ArrayList<>(); + mThroughputPercentiles = new ArrayList<>(); } /** @@ -172,28 +169,29 @@ public void setErrors(List errors) { /** * @return 100 percentiles for durations of all I/O operations */ - public List getDurationPercentiles() { - return mDurationPercentiles; + public List getThroughputPercentiles() { + return mThroughputPercentiles; } /** * @param percentiles 100 percentiles for durations of all I/O operations */ - public void setDurationPercentiles(List percentiles) { - mDurationPercentiles = percentiles; + public void setThroughputPercentiles(List percentiles) { + mThroughputPercentiles = percentiles; } /** * From the collected operation data, calculates 100 percentiles. */ public void calculatePercentiles() { - Histogram durationHistogram = new Histogram( - FormatUtils.parseTimeSize(mParameters.mDuration), - StressConstants.TIME_HISTOGRAM_PRECISION); - mDataPoints.forEach(stat -> durationHistogram.recordValue(stat.getDuration())); - for (int i = 0; i <= 100; i++) { - mDurationPercentiles.add(durationHistogram.getValueAtPercentile(i)); - } + // TODO: implement percentile calculation for coarse data points +// Histogram throughputHistogram = new Histogram( +// FormatUtils.parseSpaceSize(mParameters.mFileSize), +// StressConstants.TIME_HISTOGRAM_PRECISION); +// mDataPoints.forEach(stat -> throughputHistogram.recordValue(stat.getInThroughput())); +// for (int i = 0; i <= 100; i++) { +// mThroughputPercentiles.add(throughputHistogram.getValueAtPercentile(i)); +// } } /** @@ -206,21 +204,21 @@ public void addErrorMessage(String errMessage) { /** * @return all data points for I/O operations */ - public List getDataPoints() { + public List getDataPoints() { return mDataPoints; } /** * @param point one data point for one I/O operation */ - public void addDataPoint(WorkerBenchDataPoint point) { + public void addDataPoint(WorkerBenchCoarseDataPoint point) { mDataPoints.add(point); } /** * @param stats data points for all recorded I/O operations */ - public void addDataPoints(Collection stats) { + public void addDataPoints(Collection stats) { mDataPoints.addAll(stats); } @@ -244,11 +242,10 @@ public WorkerBenchSummary aggregate(Iterable results) thr WorkerBenchTaskResult mergedTaskResult = new WorkerBenchTaskResult(); for (WorkerBenchTaskResult result : results) { - result.calculatePercentiles(); + // result.calculatePercentiles(); mergedTaskResult.merge(result); - LOG.info("Test results from worker {} has been merged, the data points are now cleared.", - result.getBaseParameters().mId); - result.clearDataPoints(); + LOG.info("Test results from worker {} has been merged.", result.getBaseParameters().mId); + // result.clearDataPoints(); nodeResults.put(result.getBaseParameters().mId, result); } diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/Benchmark.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/Benchmark.java index 7cd051dfeef5..e4e637edfdb6 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/Benchmark.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/Benchmark.java @@ -193,8 +193,10 @@ protected String runSingleTask(String[] args) throws Exception { // Spawn a new process List command = new ArrayList<>(); command.add(conf.get(PropertyKey.HOME) + "/bin/alluxio"); - command.add("runClass"); + command.add("exec"); + command.add("class"); command.add(className); + command.add("--"); command.addAll(Arrays.asList(args)); command.add(BaseParameters.IN_PROCESS_FLAG); command.addAll(mBaseParameters.mJavaOpts.stream().map(String::trim) diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientIOBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientIOBench.java index 3c2238bab7fd..f4de122c0f9c 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientIOBench.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientIOBench.java @@ -103,10 +103,10 @@ public String getBenchDescription() { "# This test will run create a 500MB file with block size 15KB on 1 worker,", "# then test the ReadArray operation for 30s and calculate the throughput after 10s " + "warmup.", - "$ bin/alluxio runClass alluxio.stress.cli.client.StressClientIOBench --operation Write " - + "--base alluxio:///stress-client-io-base --file-size 500m --buffer-size 64k " + "$ bin/alluxio exec class alluxio.stress.cli.client.StressClientIOBench -- --operation " + + "Write --base alluxio:///stress-client-io-base --file-size 500m --buffer-size 64k " + "--block-size 16k --write-num-workers 1 --cluster --cluster-limit 1", - "$ bin/alluxio runClass alluxio.stress.cli.client.StressClientIOBench --operation " + "$ bin/alluxio exec class alluxio.stress.cli.client.StressClientIOBench -- --operation " + "ReadArray --base alluxio:///stress-client-io-base --file-size 500m --buffer-size " + "64k --block-size 16k --warmup 10s --duration 30s --write-num-workers 1 --cluster " + "--cluster-limit 1\n")); diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java index ad00754cfea9..377081f41afd 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java @@ -22,6 +22,7 @@ import alluxio.stress.BaseParameters; import alluxio.stress.cli.AbstractStressBench; import alluxio.stress.common.FileSystemParameters; +import alluxio.stress.worker.WorkerBenchCoarseDataPoint; import alluxio.stress.worker.WorkerBenchDataPoint; import alluxio.stress.worker.WorkerBenchParameters; import alluxio.stress.worker.WorkerBenchTaskResult; @@ -42,11 +43,7 @@ import java.io.IOException; import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -123,7 +120,7 @@ public String getBenchDescription() { + "be prepared for each test thread." + "# The threads will keeping reading for 30s including a 10s warmup." + "# So the result captures I/O performance from the last 20s.", - "$ bin/alluxio runClass alluxio.stress.cli.worker.StressWorkerBench \\\n" + "$ bin/alluxio exec class alluxio.stress.cli.worker.StressWorkerBench -- \\\n" + "--threads 32 --base alluxio:///stress-worker-base --file-size 100m \\\n" + "--warmup 10s --duration 30s --cluster\n" )); @@ -455,35 +452,60 @@ private void runInternal() throws Exception { CommonUtils.sleepMs(waitMs); SAMPLING_LOG.info("Test started and recording will be started after the warm up at {}", CommonUtils.convertMsToDate(recordMs, dateFormat)); + WorkerBenchCoarseDataPoint dp = new WorkerBenchCoarseDataPoint( + Long.valueOf(mBaseParameters.mIndex), + Thread.currentThread().getId(), + new ArrayList<>()); + List dpList = new ArrayList<>(); + int lastSlice = 0; + while (!Thread.currentThread().isInterrupted() && CommonUtils.getCurrentMs() < mContext.getEndMs()) { // Keep reading the same file - WorkerBenchDataPoint dataPoint = applyOperation(); - long currentMs = CommonUtils.getCurrentMs(); - // Start recording after the warmup - if (currentMs > recordMs) { - mResult.addDataPoint(dataPoint); - if (dataPoint.getIOBytes() > 0) { - mResult.incrementIOBytes(dataPoint.getIOBytes()); + long startMs = CommonUtils.getCurrentMs() - recordMs; + long bytesRead = applyOperation(); + long duration = CommonUtils.getCurrentMs() - recordMs - startMs; + if (startMs > recordMs) { + if (bytesRead > 0) { + // TODO: configurable slice size (must smaller than duration, divisible to duration) + int currentSlice = (int)(startMs / 10000); + while (currentSlice > lastSlice){ + LOG.info("CurrentSlice: {}, LastSlice: {}, adding list to dp.", currentSlice, lastSlice); + dp.addDataPoints(dpList); + dpList.clear(); + lastSlice++; + } + dpList.add(new WorkerBenchDataPoint(startMs, duration, bytesRead)); } else { LOG.warn("Thread for file {} read 0 bytes from I/O", mFilePaths[mTargetFileIndex]); } } else { - SAMPLING_LOG.info("Ignored data point during warmup: {}", dataPoint); + SAMPLING_LOG.info("Ignored record during warmup: {}, {}", startMs, bytesRead); } } + + // TODO: configurable slice size (must smaller than duration, divisible to duration) + int finalSlice = (int)(FormatUtils.parseTimeSize(mParameters.mDuration) / 10000); + LOG.info("FinalSlice: {}", finalSlice); + while (finalSlice > lastSlice){ + LOG.info("FinalSlice: {}, LastSlice: {}, adding list to dp.", finalSlice, lastSlice); + dp.addDataPoints(dpList); + dpList.clear(); + lastSlice++; + } + + mResult.addDataPoint(dp); } /** * Read the file by the offset and length based on the given index. * @return the actual red byte number */ - private WorkerBenchDataPoint applyOperation() throws IOException { + private long applyOperation() throws IOException { Path filePath = mFilePaths[mTargetFileIndex]; int offset = mOffsets[mTargetFileIndex]; int length = mLengths[mTargetFileIndex]; - long startOperation = CommonUtils.getCurrentMs(); if (mInStream == null) { mInStream = mFs.open(filePath); } @@ -515,10 +537,7 @@ private WorkerBenchDataPoint applyOperation() throws IOException { } } } - long endOperation = CommonUtils.getCurrentMs(); - return new WorkerBenchDataPoint( - mBaseParameters.mIndex, Thread.currentThread().getId(), - startOperation, endOperation - startOperation, bytesRead); + return bytesRead; } private void closeInStream() {