From f89e5f5dd7efd8d0cb0a919ca24bb7902dea4504 Mon Sep 17 00:00:00 2001 From: zhaozihao Date: Wed, 6 Mar 2024 15:44:32 +0800 Subject: [PATCH 1/4] Support Hash Algorithms Benchmark --- .../alluxio/stress/client/HashParameters.java | 56 ++++ .../alluxio/stress/client/HashTaskResult.java | 183 ++++++++++ .../stress/client/HashTaskSummary.java | 100 ++++++ .../cli/client/StressClientHashBench.java | 315 ++++++++++++++++++ 4 files changed, 654 insertions(+) create mode 100644 dora/stress/common/src/main/java/alluxio/stress/client/HashParameters.java create mode 100644 dora/stress/common/src/main/java/alluxio/stress/client/HashTaskResult.java create mode 100644 dora/stress/common/src/main/java/alluxio/stress/client/HashTaskSummary.java create mode 100644 dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java diff --git a/dora/stress/common/src/main/java/alluxio/stress/client/HashParameters.java b/dora/stress/common/src/main/java/alluxio/stress/client/HashParameters.java new file mode 100644 index 000000000000..4716c9a0f2e1 --- /dev/null +++ b/dora/stress/common/src/main/java/alluxio/stress/client/HashParameters.java @@ -0,0 +1,56 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.stress.client; + +import alluxio.stress.Parameters; + +import com.beust.jcommander.Parameter; + +/** + * Hash test parameters that users can pass in, + * through which the user sets the hash strategy, + * number of virtual nodes, number of node replicas, + * lookup table size, etc. + */ +public class HashParameters extends Parameters { + + @Parameter(names = {"--hash-policy"}, + description = "Use the hash policy to test. " + + "If you want to test multiple hash policies, please separate them with \",\", " + + "such as \"CONSISTENT,MAGLEV\". " + + "There are currently five supported policies: " + + "CONSISTENT, JUMP, KETAMA, MAGLEV, MULTI_PROBE") + public String mHashPolicy = "CONSISTENT,JUMP,KETAMA,MAGLEV,MULTI_PROBE"; + + @Parameter(names = {"--virtual-node-num"}, description = "the number of virtual nodes") + public Integer mVirtualNodeNum = 1000; + + @Parameter(names = {"--worker-num"}, description = "the number of workers") + public Integer mWorkerNum = 10; + + @Parameter(names = {"--node-replicas"}, description = "the number of ketama hashing replicas") + public Integer mNodeReplicas = 1000; + + @Parameter(names = {"--lookup-size"}, + description = "the size of the lookup table in the maglev hashing algorithm") + public Integer mLookupSize = 65537; + + @Parameter(names = {"--probe-num"}, + description = "the number of probes in the multi-probe hashing algorithm") + public Integer mProbeNum = 21; + + @Parameter(names = {"--report-path"}, description = "the path that the report will generate on") + public String mReportPath = "."; + + @Parameter(names = {"--file-num"}, description = "the num of files that will be allocated") + public Integer mFileNum = 1000000; +} diff --git a/dora/stress/common/src/main/java/alluxio/stress/client/HashTaskResult.java b/dora/stress/common/src/main/java/alluxio/stress/client/HashTaskResult.java new file mode 100644 index 000000000000..0297118c5f2f --- /dev/null +++ b/dora/stress/common/src/main/java/alluxio/stress/client/HashTaskResult.java @@ -0,0 +1,183 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.stress.client; + +import alluxio.stress.BaseParameters; +import alluxio.stress.Summary; +import alluxio.stress.TaskResult; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.ArrayList; +import java.util.List; + +/** + * Hash test all relevant results, including time, standard deviation, etc. + */ +public class HashTaskResult implements TaskResult { + private List mErrors; + private HashParameters mParameters; + private BaseParameters mBaseParameters; + + /** + * All Hashing Algorithms Result. + */ + private List mAllTestsResult; + + /** + * An empty constructor. + * */ + public HashTaskResult() { + mAllTestsResult = new ArrayList<>(); + mErrors = new ArrayList<>(); + } + + /** + * The constructor used for serialization. + * + * @param errors the errors + * */ + public HashTaskResult(@JsonProperty("errors") List errors) { + mErrors = errors; + } + + /** + * @param errorMsg an error msg to add + * */ + public void addError(String errorMsg) { + mErrors.add(errorMsg); + } + + @Override + public TaskResult.Aggregator aggregator() { + return new HashTaskResult.Aggregator(); + } + + private static final class Aggregator implements TaskResult.Aggregator { + @Override + public Summary aggregate(Iterable results) throws Exception { + return new HashTaskSummary(reduceList(results)); + } + } + + /** + * @return Errors if exist + */ + public List getErrors() { + return mErrors; + } + + /** + * @param baseParameters the {@link BaseParameters} to use + * */ + public void setBaseParameters(BaseParameters baseParameters) { + mBaseParameters = baseParameters; + } + + @Override + public BaseParameters getBaseParameters() { + return mBaseParameters; + } + + /** + * @param errors the errors + * */ + public void setErrors(List errors) { + mErrors = errors; + } + + /** + * @return the {@link HashParameters} + * */ + public HashParameters getParameters() { + return mParameters; + } + + /** + * @param parameters the {@link HashParameters} to use + * */ + public void setParameters(HashParameters parameters) { + mParameters = parameters; + } + + @Override + public String toString() { + String summary = ""; + for (SingleTestResult result: mAllTestsResult) { + summary += String.format( + "Hashing Algorithm=%s, Time Cost=%sms, Standard Deviation=%s, File Reallocated Num=%s%n", + result.mHashAlgorithm, result.mTimeCost, + result.mStandardDeviation, result.mFileReallocatedNum); + } + return summary; + } + + /** + * Results of a single hash test. + */ + public static class SingleTestResult { + /** + * The chosen hashing algorithm. + */ + private String mHashAlgorithm; + + /** + * Hash testing takes time. + */ + private long mTimeCost; + /** + * The standard deviation of the number of files allocated to each worker. + */ + private double mStandardDeviation; + /** + * The number of workers reallocated after deleting a worker. + */ + private int mFileReallocatedNum; + + /** + * @param hashAlgorithm The chosen hashing algorithm + * @param timeCost Hash testing takes time + * @param standardDeviation The standard deviation of the number of files allocated to workers + * @param fileReallocatedNum The number of workers reallocated after deleting a worker + * + */ + public SingleTestResult(String hashAlgorithm, long timeCost, + double standardDeviation, int fileReallocatedNum) { + mHashAlgorithm = hashAlgorithm; + mTimeCost = timeCost; + mStandardDeviation = standardDeviation; + mFileReallocatedNum = fileReallocatedNum; + } + } + + /** + * Add the result of one hash test to the total result. + * @param result The result of one hash test + */ + public void addSingleTestResult(SingleTestResult result) { + mAllTestsResult.add(result); + } + + /** + * Reduce a list of {@link HashTaskResult} into one. + * + * @param results a list of results to combine + * @return the combined result + * */ + public static HashTaskResult reduceList(Iterable results) { + HashTaskResult aggreResult = new HashTaskResult(); + for (HashTaskResult r : results) { + aggreResult = r; + } + return aggreResult; + } +} diff --git a/dora/stress/common/src/main/java/alluxio/stress/client/HashTaskSummary.java b/dora/stress/common/src/main/java/alluxio/stress/client/HashTaskSummary.java new file mode 100644 index 000000000000..db19254bd770 --- /dev/null +++ b/dora/stress/common/src/main/java/alluxio/stress/client/HashTaskSummary.java @@ -0,0 +1,100 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.stress.client; + +import alluxio.stress.BaseParameters; +import alluxio.stress.GraphGenerator; +import alluxio.stress.Summary; + +import com.fasterxml.jackson.annotation.JsonCreator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * The summary for the Hash test. + */ +public class HashTaskSummary implements Summary { + private static final Logger LOG = LoggerFactory.getLogger(HashTaskSummary.class); + private List mErrors; + private BaseParameters mBaseParameters; + private HashParameters mParameters; + + /** + * Used for deserialization. + * */ + @JsonCreator + public HashTaskSummary() {} + + /** + * @param result the {@link HashTaskResult} to summarize + * */ + public HashTaskSummary(HashTaskResult result) { + mErrors = new ArrayList<>(result.getErrors()); + mBaseParameters = result.getBaseParameters(); + mParameters = result.getParameters(); + } + + /** + * @return the errors recorded + * */ + public List getErrors() { + return mErrors; + } + + /** + * @param errors the errors + * */ + public void setErrors(List errors) { + mErrors = errors; + } + + /** + * @return the {@link BaseParameters} + * */ + public BaseParameters getBaseParameters() { + return mBaseParameters; + } + + /** + * @param baseParameters the {@link BaseParameters} + * */ + public void setBaseParameters(BaseParameters baseParameters) { + mBaseParameters = baseParameters; + } + + /** + * @return the task specific {@link HashParameters} + * */ + public HashParameters getParameters() { + return mParameters; + } + + /** + * @param parameters the {@link HashParameters} + * */ + public void setParameters(HashParameters parameters) { + mParameters = parameters; + } + + @Override + public String toString() { + return String.format("HashTaskSummary: {Errors=%s}%n", mErrors); + } + + @Override + public GraphGenerator graphGenerator() { + return null; + } +} diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java new file mode 100644 index 000000000000..f98308c1d001 --- /dev/null +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java @@ -0,0 +1,315 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.stress.cli.client; + +import alluxio.client.block.BlockWorkerInfo; +import alluxio.client.file.dora.WorkerLocationPolicy; +import alluxio.conf.Configuration; +import alluxio.conf.InstancedConfiguration; +import alluxio.conf.PropertyKey; +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.membership.WorkerClusterView; +import alluxio.stress.cli.Benchmark; +import alluxio.stress.client.HashParameters; +import alluxio.stress.client.HashTaskResult; +import alluxio.util.ExceptionUtils; +import alluxio.wire.WorkerIdentity; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerNetAddress; + +import com.beust.jcommander.ParametersDelegate; +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Random; + +/** + * A benchmark tool measuring various hashing policy. + * */ +public class StressClientHashBench extends Benchmark { + + private static final Logger LOG = LoggerFactory.getLogger(StressClientHashBench.class); + + @ParametersDelegate + private final HashParameters mParameters = new HashParameters(); + + private List mFileNamesList; + + private WorkerClusterView mWorkers; + private List mWorkerInfos; + + @Override + public String getBenchDescription() { + return String.join("\n", ImmutableList.of( + "A benchmarking tool for various hashing policy", + "This test will measure " + + "program running time. " + + "The standard deviation of the number of assigned workers. ", + "", + + "Example:", + "# This invokes the hashing test", + "# 10 workers will be used", + "# 10000 virtual nodes will be used", + "$ bin/alluxio exec class alluxio.stress.cli.client.StressClientHashBench -- \\" + + "--virtual-node-num 10000 --worker-num 10 --node-replicas 1000 \\" + + "--lookup-size 65537 --probe-num 21", + "" + )); + } + + @Override + public HashTaskResult runLocal() throws Exception { + HashTaskResult result = null; + + // Add timestamp to test report file name + String reportFileName = "Stress-Client-Hash-Bench-" + + new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date()) + ".txt"; + + // If the custom path entered by the user ends with "/", remove this "/". + if (mParameters.mReportPath.endsWith("/")) { + mParameters.mReportPath = + mParameters.mReportPath.substring(0, mParameters.mReportPath.length() - 1); + } + + // If the folder does not exist, create the folder. + File filePath = new File(mParameters.mReportPath); + if (!filePath.exists() && !filePath.isDirectory()) { + filePath.mkdirs(); + } + FileOutputStream fos = new FileOutputStream(mParameters.mReportPath + "/" + reportFileName); + + try { + result = runHashBench(); + LOG.debug("Hash benchmark finished with result: {}", result); + + // Write an explanation of the hash test results. + String resultDescription = + "Each hash algorithm report consists of three results:\n\n" + + "1. Time Cost: The time consumed after the file " + + "is allocated once to judge the efficiency of the algorithm.\n\n" + + "2. Standard Deviation: The standard deviation of the number assigned to each Worker " + + "to judge the uniformity of the algorithm.\n\n" + + "3. File Reallocated: After randomly deleting a Worker, redistribute the File again, " + + "and count how many files assigned to the Worker have changed. " + + "The fewer the number of File moves, the better the consistency of the algorithm.\n\n"; + + fos.write(resultDescription.getBytes()); + + // Write the report into outputFile + fos.write(result.toString().getBytes()); + + // Close the streams + fos.flush(); + fos.close(); + fos = null; + return result; + } catch (Exception e) { + if (result == null) { + LOG.error("Failed run Hash Benchmark", e); + result = new HashTaskResult(); + result.setParameters(mParameters); + result.addError(ExceptionUtils.asPlainText(e)); + } + return result; + } finally { + if (fos != null) { + fos.close(); + } + } + } + + @Override + public void prepare() throws Exception { + mFileNamesList = new ArrayList<>(); + + // Prepare the file name of the simulation file. The file name is randomly generated. + for (int i = 0; i < mParameters.mFileNum; i++) { + mFileNamesList.add(randomString(10)); + } + + // Generate simulated Worker. + mWorkerInfos = new ArrayList<>(); + for (int i = 0; i < mParameters.mWorkerNum; i++) { + mWorkerInfos.add( + new WorkerInfo() + .setIdentity(ofLegacyId(i)) + .setAddress(new WorkerNetAddress() + .setHost("worker" + i).setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0) + ); + } + mWorkers = new WorkerClusterView(mWorkerInfos); + } + + /** + * generate random string as file name. + * @param len + * @return random string + */ + private static String randomString(int len) { + // Randomly generate a string of length len + String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-."; + Random random = new Random(); + StringBuffer stringBuffer = new StringBuffer(); + for (int i = 0; i < len; i++) { + int number = random.nextInt(64); + stringBuffer.append(str.charAt(number)); + } + return stringBuffer.toString(); + } + + /** + * @param id id + * @return the identity that is based on the specified numeric id + */ + public static WorkerIdentity ofLegacyId(long id) { + return WorkerIdentity.ParserV0.INSTANCE.fromLong(id); + } + + /** + * @param args command-line arguments + */ + public static void main(String[] args) { + mainInternal(args, new StressClientHashBench()); + } + + // Benchmark for running hashing algorithms + private HashTaskResult runHashBench() throws Exception { + HashTaskResult result = new HashTaskResult(); + // Extract the hash algorithm passed in by the user. + // Different hash algorithms are separated by commas. + List hashPolicyList = Arrays.asList((mParameters.mHashPolicy.split(","))); + for (String policy: hashPolicyList) { + // Determine whether the hash algorithm name passed by the user is legal + if (policy.equals("CONSISTENT") || policy.equals("JUMP") || policy.equals("KETAMA") + || policy.equals("MAGLEV") || policy.equals("MULTI_PROBE")) { + result.addSingleTestResult(testHashPolicy(policy)); + } + else { + System.out.println(policy + " is not supported!"); + } + } + return result; + } + + // Test a hash algorithm + private HashTaskResult.SingleTestResult testHashPolicy(String hashPolicy) + throws ResourceExhaustedException { + long startTime = System.currentTimeMillis(); + + // Add the parameters set by the user to the conf file. + InstancedConfiguration conf = new InstancedConfiguration(Configuration.copyProperties()); + conf.set(PropertyKey.USER_WORKER_SELECTION_POLICY, hashPolicy); + if (hashPolicy.equals("CONSISTENT")) { + conf.set(PropertyKey.USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER, + mParameters.mVirtualNodeNum); + } + else if (hashPolicy.equals("KETAMA")) { + conf.set(PropertyKey.USER_KETAMA_HASH_REPLICAS, mParameters.mNodeReplicas); + } + else if (hashPolicy.equals("MAGLEV")) { + conf.set(PropertyKey.USER_MAGLEV_HASH_LOOKUP_SIZE, mParameters.mLookupSize); + } + else if (hashPolicy.equals("MULTI_PROBE")) { + conf.set(PropertyKey.USER_MULTI_PROBE_HASH_PROBE_NUM, mParameters.mProbeNum); + } + WorkerLocationPolicy policy = WorkerLocationPolicy.Factory.create(conf); + + // Record the number of files allocated to each worker + HashMap workerCount = new HashMap<>(); + + // 保存每个文件存储到哪个worker + List fileWorkerList = new ArrayList<>(); + + // Simulate the process of hashing a file and then assigning it to a worker + for (int i = 0; i < mParameters.mFileNum; i++) { + String fileName = mFileNamesList.get(i); + List workers = policy.getPreferredWorkers(mWorkers, fileName, 1); + for (BlockWorkerInfo worker : workers) { + if (workerCount.containsKey(worker.getIdentity())) { + workerCount.put(worker.getIdentity(), workerCount.get(worker.getIdentity()) + 1); + } else { + workerCount.put(worker.getIdentity(), 1); + } + fileWorkerList.add(worker.getIdentity()); + } + } + long endTime = System.currentTimeMillis(); + long timeCost = endTime - startTime; + + // Count how many files are allocated on each worker. + List workerCountList = new ArrayList<>(workerCount.values()); + + // Calculate the standard deviation of the number of files allocated to each worker above. + double standardDeviation = getStandardDeviation(workerCountList); + + // randomly removing a Worker + mWorkerInfos.remove((int) (Math.random() * mParameters.mWorkerNum)); + mWorkers = new WorkerClusterView(mWorkerInfos); + + // Randomly remove a Worker and re-simulate the file allocation process + // to determine the consistency of the algorithm. + policy = WorkerLocationPolicy.Factory.create(conf); + workerCount.clear(); + List newFileWorkerList = new ArrayList<>(); + + for (int i = 0; i < mParameters.mFileNum; i++) { + String fileName = mFileNamesList.get(i); + List workers = policy.getPreferredWorkers(mWorkers, fileName, 1); + for (BlockWorkerInfo worker : workers) { + newFileWorkerList.add(worker.getIdentity()); + } + } + + // Compare how many files stored in the Worker have changed + int fileReallocatedNum = 0; + for (int i = 0; i < mParameters.mFileNum; i++) { + if (fileWorkerList.get(i) != newFileWorkerList.get(i)) { + ++fileReallocatedNum; + } + } + + return new HashTaskResult.SingleTestResult( + hashPolicy, timeCost, standardDeviation, fileReallocatedNum); + } + + /** + * @param list Stores the number used to calculate the standard deviation + * @return Standard deviation of all numbers in List + */ + private double getStandardDeviation(List list) { + double variance = 0; + double average = 0; + for (int i = 0; i < list.size(); i++) { + average += list.get(i); + } + average /= list.size(); + for (int i = 0; i < list.size(); i++) { + variance += Math.pow(list.get(i) - average, 2); + } + variance /= list.size(); + + // Keep to two decimal places + return Math.round(Math.sqrt(variance) * 100) / 100.0; + } +} From dff2efe4937c596c28e94cd3783f2041df224614 Mon Sep 17 00:00:00 2001 From: zhaozihao Date: Wed, 6 Mar 2024 19:13:14 +0800 Subject: [PATCH 2/4] Support Hash Algorithms Benchmark --- .../alluxio/stress/client/HashParameters.java | 2 +- .../cli/client/StressClientHashBench.java | 25 +++++++++++++++---- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/dora/stress/common/src/main/java/alluxio/stress/client/HashParameters.java b/dora/stress/common/src/main/java/alluxio/stress/client/HashParameters.java index 4716c9a0f2e1..1a588eaab1b2 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/client/HashParameters.java +++ b/dora/stress/common/src/main/java/alluxio/stress/client/HashParameters.java @@ -28,7 +28,7 @@ public class HashParameters extends Parameters { + "If you want to test multiple hash policies, please separate them with \",\", " + "such as \"CONSISTENT,MAGLEV\". " + "There are currently five supported policies: " - + "CONSISTENT, JUMP, KETAMA, MAGLEV, MULTI_PROBE") + + "CONSISTENT,JUMP,KETAMA,MAGLEV,MULTI_PROBE") public String mHashPolicy = "CONSISTENT,JUMP,KETAMA,MAGLEV,MULTI_PROBE"; @Parameter(names = {"--virtual-node-num"}, description = "the number of virtual nodes") diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java index f98308c1d001..acf23ef49f59 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java @@ -60,18 +60,33 @@ public class StressClientHashBench extends Benchmark { public String getBenchDescription() { return String.join("\n", ImmutableList.of( "A benchmarking tool for various hashing policy", - "This test will measure " - + "program running time. " - + "The standard deviation of the number of assigned workers. ", + "This test will measure:\n" + + "1. Time Cost: The time consumed after the file " + + "is allocated once to judge the efficiency of the algorithm.\n\n" + + "2. Standard Deviation: The standard deviation of the number assigned to each " + + "worker to judge the uniformity of the algorithm.\n\n" + + "3. File Reallocated: After randomly deleting a Worker, redistribute the File again, " + + "and count how many files assigned to the Worker have changed. " + + "The fewer the number of File moves, " + + "the better the consistency of the algorithm.\n\n", "", "Example:", - "# This invokes the hashing test", + "# This invokes the hashing tests", + "# 5 hash algorithms will be tested: CONSISTENT, JUMP, KETAMA, MAGLEV, MULTI_PROBE", "# 10 workers will be used", "# 10000 virtual nodes will be used", + "# 10 workers will be used", + "# 1000 worker replicas will be used", + "# the size of lookup table is 65537 (must be a prime)", + "# the num of probes is 21", + "# The report will be generated under the current path", + "# The number of simulation test files is 1,000,000", "$ bin/alluxio exec class alluxio.stress.cli.client.StressClientHashBench -- \\" + + "--hash-policy CONSISTENT,JUMP,KETAMA,MAGLEV,MULTI_PROBE \\" + "--virtual-node-num 10000 --worker-num 10 --node-replicas 1000 \\" - + "--lookup-size 65537 --probe-num 21", + + "--lookup-size 65537 --probe-num 21 --report-path . \\" + + "--file-num 1000000", "" )); } From bd3b6f1c3233221617a823be5e289b61238240ce Mon Sep 17 00:00:00 2001 From: zhaozihao Date: Thu, 7 Mar 2024 10:33:13 +0800 Subject: [PATCH 3/4] Support Hash Algorithms Benchmark --- .../alluxio/stress/client/HashTaskSummary.java | 6 +++++- .../stress/cli/client/StressClientHashBench.java | 16 ++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/dora/stress/common/src/main/java/alluxio/stress/client/HashTaskSummary.java b/dora/stress/common/src/main/java/alluxio/stress/client/HashTaskSummary.java index db19254bd770..cb9746473c9e 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/client/HashTaskSummary.java +++ b/dora/stress/common/src/main/java/alluxio/stress/client/HashTaskSummary.java @@ -23,7 +23,11 @@ import java.util.List; /** - * The summary for the Hash test. + * This class actually does not work at the moment, + * but if StressClientHashBench inherits the Benchmark interface, + * it must implement the most basic functions of the HashTaskSummary class. + * If the hash algorithm benchmark adds complex functions in the future, + * the functions can be expanded on this class. */ public class HashTaskSummary implements Summary { private static final Logger LOG = LoggerFactory.getLogger(HashTaskSummary.class); diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java index acf23ef49f59..3c37053688e7 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java @@ -235,6 +235,8 @@ private HashTaskResult.SingleTestResult testHashPolicy(String hashPolicy) // Add the parameters set by the user to the conf file. InstancedConfiguration conf = new InstancedConfiguration(Configuration.copyProperties()); conf.set(PropertyKey.USER_WORKER_SELECTION_POLICY, hashPolicy); + + // Configure the parameters corresponding to each hash algorithm if (hashPolicy.equals("CONSISTENT")) { conf.set(PropertyKey.USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER, mParameters.mVirtualNodeNum); @@ -248,28 +250,36 @@ else if (hashPolicy.equals("MAGLEV")) { else if (hashPolicy.equals("MULTI_PROBE")) { conf.set(PropertyKey.USER_MULTI_PROBE_HASH_PROBE_NUM, mParameters.mProbeNum); } + + // Create a hash policy WorkerLocationPolicy policy = WorkerLocationPolicy.Factory.create(conf); // Record the number of files allocated to each worker HashMap workerCount = new HashMap<>(); - // 保存每个文件存储到哪个worker + // Record which worker each file is stored to List fileWorkerList = new ArrayList<>(); // Simulate the process of hashing a file and then assigning it to a worker for (int i = 0; i < mParameters.mFileNum; i++) { String fileName = mFileNamesList.get(i); List workers = policy.getPreferredWorkers(mWorkers, fileName, 1); + + // Add 1 to the cumulative value corresponding to the worker allocated by file for (BlockWorkerInfo worker : workers) { if (workerCount.containsKey(worker.getIdentity())) { workerCount.put(worker.getIdentity(), workerCount.get(worker.getIdentity()) + 1); } else { workerCount.put(worker.getIdentity(), 1); } + // Record which worker the file is assigned to, + // so that you can compare the changes after removing the worker later. fileWorkerList.add(worker.getIdentity()); } } long endTime = System.currentTimeMillis(); + + // Here we count the cumulative time of allocating all files. long timeCost = endTime - startTime; // Count how many files are allocated on each worker. @@ -288,6 +298,7 @@ else if (hashPolicy.equals("MULTI_PROBE")) { workerCount.clear(); List newFileWorkerList = new ArrayList<>(); + // Repeat the file allocation process. for (int i = 0; i < mParameters.mFileNum; i++) { String fileName = mFileNamesList.get(i); List workers = policy.getPreferredWorkers(mWorkers, fileName, 1); @@ -324,7 +335,8 @@ private double getStandardDeviation(List list) { } variance /= list.size(); - // Keep to two decimal places + // Take the square root of the difference to get the standard deviation, + // and keep it to two decimal places. return Math.round(Math.sqrt(variance) * 100) / 100.0; } } From 717013a826d1b82c2ef0ca9af1c1f831ecf6160c Mon Sep 17 00:00:00 2001 From: zhaozihao Date: Fri, 8 Mar 2024 08:24:08 +0800 Subject: [PATCH 4/4] Support Hash Algorithms Benchmark --- .../cli/client/StressClientHashBench.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java index 3c37053688e7..710343b736e9 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/client/StressClientHashBench.java @@ -160,20 +160,6 @@ public void prepare() throws Exception { for (int i = 0; i < mParameters.mFileNum; i++) { mFileNamesList.add(randomString(10)); } - - // Generate simulated Worker. - mWorkerInfos = new ArrayList<>(); - for (int i = 0; i < mParameters.mWorkerNum; i++) { - mWorkerInfos.add( - new WorkerInfo() - .setIdentity(ofLegacyId(i)) - .setAddress(new WorkerNetAddress() - .setHost("worker" + i).setRpcPort(29998).setDataPort(29999).setWebPort(30000)) - .setCapacityBytes(1024) - .setUsedBytes(0) - ); - } - mWorkers = new WorkerClusterView(mWorkerInfos); } /** @@ -230,6 +216,20 @@ private HashTaskResult runHashBench() throws Exception { // Test a hash algorithm private HashTaskResult.SingleTestResult testHashPolicy(String hashPolicy) throws ResourceExhaustedException { + // Generate simulated Worker. + mWorkerInfos = new ArrayList<>(); + for (int i = 0; i < mParameters.mWorkerNum; i++) { + mWorkerInfos.add( + new WorkerInfo() + .setIdentity(ofLegacyId(i)) + .setAddress(new WorkerNetAddress() + .setHost("worker" + i).setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0) + ); + } + mWorkers = new WorkerClusterView(mWorkerInfos); + long startTime = System.currentTimeMillis(); // Add the parameters set by the user to the conf file.