Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add throughput distribution for StressWorkerBench #17992

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ public String runTask(StressBenchConfig config, ArrayList<String> args,
RunTaskContext runTaskContext) throws Exception {
List<String> command = new ArrayList<>(3 + config.getArgs().size());
command.add(Configuration.get(PropertyKey.HOME) + "/bin/alluxio");
command.add("runClass");
command.add("exec");
command.add("class");
command.add(config.getClassName());
command.add("--");

// the cluster will run distributed tasks
command.add(BaseParameters.DISTRIBUTED_FLAG);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package alluxio.stress.worker;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;

import java.util.List;

@JsonDeserialize(using = WorkerBenchCoarseDataPointDeserializer.class)
public class WorkerBenchCoarseDataPoint {
// properties: workerId, threadId, sliceId, records
@JsonProperty("wid")
private Long mWid;
@JsonProperty("tid")
private Long mTid;
@JsonProperty("data")
private List<List<WorkerBenchDataPoint>> mData;

// constructor
public WorkerBenchCoarseDataPoint(Long workerID, Long threadID, List<List<WorkerBenchDataPoint>> data) {
mWid = workerID;
mTid = threadID;
mData = data;
}

// getter & setters
public Long getWid() {
return mWid;
}

public void setWid(Long wid) {
mWid = wid;
}

public Long getTid() {
return mTid;
}

public void setTid(Long tid) {
mTid = tid;
}

public List<List<WorkerBenchDataPoint>> getData() {
return mData;
}

public void setData(List<List<WorkerBenchDataPoint>> data) {
mData = data;
}

public void addDataPoints(List<WorkerBenchDataPoint> data) {
mData.add(data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package alluxio.stress.worker;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A deserializer converting {@link WorkerBenchCoarseDataPoint} from JSON.
*/
public class WorkerBenchCoarseDataPointDeserializer extends JsonDeserializer<WorkerBenchCoarseDataPoint> {
@Override
public WorkerBenchCoarseDataPoint deserialize(JsonParser parser, DeserializationContext ctx)
throws IOException {
ObjectMapper mapper = (ObjectMapper) parser.getCodec();
JsonNode node = parser.getCodec().readTree(parser);
Long wId = node.get("wid").asLong();
Long tId = node.get("tid").asLong();
List<List<WorkerBenchDataPoint>> data = new ArrayList<>();
JsonNode dataNode = node.get("data");
if (dataNode != null) {
for (JsonNode listNode: dataNode){
List<WorkerBenchDataPoint> dataPoints = new ArrayList<>();
for (JsonNode subListNode: listNode) {
WorkerBenchDataPoint dataPoint = mapper.treeToValue(subListNode, WorkerBenchDataPoint.class);
dataPoints.add(dataPoint);
}
data.add(dataPoints);
}
}
return new WorkerBenchCoarseDataPoint(wId, tId, data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,60 +14,32 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.base.MoreObjects;

/**
* One data point captures the information we collect from one I/O operation to a worker.
* The one operation may be a full scan or positioned read on a file.
*/
@JsonDeserialize(using = WorkerBenchDataPointDeserializer.class)
public class WorkerBenchDataPoint {
@JsonProperty("workerID")
public String mWorkerID;
@JsonProperty("threadID")
public long mThreadID;

@JsonProperty("duration")
public long mDuration;
@JsonProperty("start")
@JsonProperty("startMs")
public long mStartMs;
@JsonProperty("ioBytes")
public long mIOBytes;

/**
* @param workerID the worker this I/O operation reads
* @param threadID the thread performing the I/O
* @param startMs start timestamp of the I/O
* @param duration duration of the file read operation
* @param ioBytes bytes read
*/
@JsonCreator
public WorkerBenchDataPoint(@JsonProperty("workerID") String workerID,
@JsonProperty("threadID") long threadID,
@JsonProperty("start") long startMs,
@JsonProperty("duration") long duration,
@JsonProperty("ioBytes") long ioBytes) {
mWorkerID = workerID;
mThreadID = threadID;
public WorkerBenchDataPoint(long startMs, long duration, long ioBytes) {
mStartMs = startMs;
mDuration = duration;
mIOBytes = ioBytes;
}

/**
* @return worker ID
*/
public String getWorkerID() {
return mWorkerID;
}

/**
* @return thread ID
*/
public long getThreadID() {
return mThreadID;
}

/**
* @return duration in ms
*/
Expand All @@ -89,20 +61,6 @@ public long getIOBytes() {
return mIOBytes;
}

/**
* @param workerID worker ID
*/
public void setWorkerID(String workerID) {
mWorkerID = workerID;
}

/**
* @param threadID the thread ID
*/
public void setThreadID(long threadID) {
mThreadID = threadID;
}

/**
* @param duration duration in ms
*/
Expand All @@ -123,13 +81,4 @@ public void setStartMs(long startMs) {
public void setIOBytes(long ioBytes) {
mIOBytes = ioBytes;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("threadID", mThreadID)
.add("ioBytes", mIOBytes)
.add("duration", mDuration)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ public WorkerBenchDataPoint deserialize(JsonParser parser, DeserializationContex
throws IOException {
JsonNode node = parser.getCodec().readTree(parser);
return new WorkerBenchDataPoint(
node.get("workerID").asText(), node.get("threadID").asLong(),
node.get("startMs").asLong(), node.get("duration").asLong(),
node.get("ioBytes").asLong()
node.get("startMs").asLong(), node.get("duration").asLong(), node.get("ioBytes").asLong()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@
import alluxio.Constants;
import alluxio.collections.Pair;
import alluxio.stress.Parameters;
import alluxio.stress.StressConstants;
import alluxio.stress.Summary;
import alluxio.stress.common.GeneralBenchSummary;
import alluxio.stress.graph.Graph;
import alluxio.stress.graph.LineGraph;
import alluxio.util.FormatUtils;

import com.google.common.base.Splitter;
import org.HdrHistogram.Histogram;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -39,15 +36,15 @@ public final class WorkerBenchSummary extends GeneralBenchSummary<WorkerBenchTas
private long mDurationMs;
private long mEndTimeMs;
private long mIOBytes;
private List<Long> mDurationPercentiles;
private List<Long> mThroughputPercentiles;

/**
* Creates an instance.
* Default constructor required for json deserialization.
*/
public WorkerBenchSummary() {
mNodeResults = new HashMap<>();
mDurationPercentiles = new ArrayList<>();
mThroughputPercentiles = new ArrayList<>();
}

/**
Expand All @@ -65,16 +62,17 @@ public WorkerBenchSummary(WorkerBenchTaskResult mergedTaskResults,
mNodeResults = nodes;
mThroughput = getIOMBps();

mDurationPercentiles = new ArrayList<>();
Histogram durationHistogram = new Histogram(
FormatUtils.parseTimeSize(mParameters.mDuration)
+ FormatUtils.parseTimeSize(mParameters.mWarmup),
StressConstants.TIME_HISTOGRAM_PRECISION);
mergedTaskResults.getDataPoints().forEach(stat ->
durationHistogram.recordValue(stat.getDuration()));
for (int i = 0; i <= 100; i++) {
mDurationPercentiles.add(durationHistogram.getValueAtPercentile(i));
}
// TODO: implement percentile calculation for coarse data points

// mThroughputPercentiles = new ArrayList<>();
// Histogram throughputHistogram = new Histogram(
// FormatUtils.parseSpaceSize(mParameters.mFileSize),
// StressConstants.TIME_HISTOGRAM_PRECISION);
// mergedTaskResults.getDataPoints().forEach(stat ->
// throughputHistogram.recordValue(stat.getInThroughput()));
// for (int i = 0; i <= 100; i++) {
// mThroughputPercentiles.add(throughputHistogram.getValueAtPercentile(i));
// }
}

/**
Expand Down Expand Up @@ -150,15 +148,15 @@ public void setIOBytes(long IOBytes) {
/**
* @return 0~100 percentiles of recorded durations
*/
public List<Long> getDurationPercentiles() {
return mDurationPercentiles;
public List<Long> getThroughputPercentiles() {
return mThroughputPercentiles;
}

/**
* @param percentiles a list of calculated percentiles from recorded durations
*/
public void setDurationPercentiles(List<Long> percentiles) {
mDurationPercentiles = percentiles;
public void setThroughputPercentiles(List<Long> percentiles) {
mThroughputPercentiles = percentiles;
}

@Override
Expand All @@ -185,7 +183,7 @@ public List<Graph> generate(List<? extends Summary> results) {
Pair<List<String>, List<String>> fieldNames = Parameters.partitionFieldNames(
summaries.stream().map(x -> x.mParameters).collect(Collectors.toList()));

// Split up common description into 100 character chunks, for the sub title
// Split up common description into 100 character chunks, for the subtitle
List<String> subTitle = new ArrayList<>(Splitter.fixedLength(100).splitToList(
summaries.get(0).mParameters.getDescription(fieldNames.getFirst())));

Expand Down
Loading
Loading