From f1d54f7c351961a4bc00ff0880d0ab1b7a4e9890 Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Thu, 24 Jun 2021 18:47:19 +0530 Subject: [PATCH] Parallel processing (#132) * Added a explicit Flush Task to flush data at Thread level once it completes the processing * Included explicit flush per Thread level * Done changes for parallel processing * Removed extra brace * Removed unused variable * Removed unused variable initialization * Did the required formating * Refactored the code and added required comments & checks --- .../apache/lucene/benchmark/Constants.java | 2 + .../byTask/feeds/ReutersContentSource.java | 54 +++++++++++++------ .../benchmark/byTask/tasks/TaskSequence.java | 14 ++++- .../lucene/benchmark/byTask/utils/Config.java | 9 ++++ 4 files changed, 61 insertions(+), 18 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java index 53a1a25fc0b6..86534eaf99cb 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java @@ -26,4 +26,6 @@ public class Constants { public static Boolean[] BOOLEANS = new Boolean[] {Boolean.FALSE, Boolean.TRUE}; public static final int DEFAULT_MAXIMUM_DOCUMENTS = Integer.MAX_VALUE; + + public static final String PARALLEL_TASK_THREAD_NAME_PREFIX = "ParallelTaskThread"; } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 76c229ff9088..8e040b7f134d 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.Locale; +import org.apache.lucene.benchmark.Constants; import org.apache.lucene.benchmark.byTask.utils.Config; /** @@ -50,8 +51,8 @@ private static final class DateFormatInfo { private ThreadLocal dateFormat = new ThreadLocal<>(); private Path dataDir = null; private ArrayList inputFiles = new ArrayList<>(); - private int nextFile = 0; - private int iteration = 0; + private int[] docCountArr; + private volatile boolean docCountArrCreated; @Override public void setConfig(Config config) { @@ -100,21 +101,35 @@ public void close() throws IOException { @Override public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException { - Path f = null; - String name = null; - synchronized (this) { - if (nextFile >= inputFiles.size()) { - // exhausted files, start a new round, unless forever set to false. - if (!forever) { - throw new NoMoreDataException(); - } - nextFile = 0; - iteration++; - } - f = inputFiles.get(nextFile++); - name = f.toRealPath() + "_" + iteration; + if (docCountArrCreated == false) { + docCountArrInit(); } + int threadIndexSize = Thread.currentThread().getName().length(); + int parallelTaskThreadSize = Constants.PARALLEL_TASK_THREAD_NAME_PREFIX.length(); + + // Extract ThreadIndex from unique ThreadName which is set with '"ParallelTaskThread-"+index', + // in TaskSequence.java's doParallelTasks() + int threadIndex = + Integer.parseInt( + Thread.currentThread() + .getName() + .substring(parallelTaskThreadSize + 1, threadIndexSize)); + + assert (threadIndex >= 0 && threadIndex < docCountArr.length) + : "Please check threadIndex or docCountArr length"; + int stride = threadIndex + docCountArr[threadIndex] * docCountArr.length; + int inFileSize = inputFiles.size(); + + // Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of + // Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads + int fileIndex = stride % inFileSize; + int iteration = stride / inFileSize; + docCountArr[threadIndex]++; + + Path f = inputFiles.get(fileIndex); + String name = f.toRealPath() + "_" + iteration; + try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) { // First line is the date, 3rd is the title, rest is body String dateStr = reader.readLine(); @@ -143,7 +158,12 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc @Override public synchronized void resetInputs() throws IOException { super.resetInputs(); - nextFile = 0; - iteration = 0; + } + + private synchronized void docCountArrInit() { + if (docCountArrCreated == false) { + docCountArr = new int[getConfig().getNumThreads()]; + docCountArrCreated = true; + } } } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index f74aaab11f0c..bb049d490ffe 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import org.apache.lucene.benchmark.Constants; import org.apache.lucene.benchmark.byTask.PerfRunData; import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException; import org.apache.lucene.benchmark.byTask.stats.TaskStats; @@ -340,12 +341,23 @@ private int doParallelTasks() throws Exception { initTasksArray(); ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()]; + // Get number of parallel threads from algo file and set it to use in ReuersContentSource.java's + // docCountArrInit() + this.getRunData().getConfig().setNumThreads(t.length); // prepare threads int index = 0; for (int k = 0; k < repetitions; k++) { for (int i = 0; i < tasksArray.length; i++) { final PerfTask task = tasksArray[i].clone(); - t[index++] = new ParallelTask(task); + t[index] = new ParallelTask(task); + // Setting unique ThreadName with index value which is used in ReuersContentSource.java's + // getNextDocData().Please make changes + // in ReuersContentSource.java's getNextDocData() for + // Integer.parseInt(Thread.currentThread().getName().substring(parallelTaskThreadSize + 1, + // threadIndexSize)) + // before making any modifications here + t[index].setName(Constants.PARALLEL_TASK_THREAD_NAME_PREFIX + "-" + index); + index++; } } // run threads diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java index 74709156c9a5..5eafb553fcf7 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java @@ -54,6 +54,7 @@ public class Config { private HashMap valByRound = new HashMap<>(); private HashMap colForValByRound = new HashMap<>(); private String algorithmText; + private int numThreads = 1; /** * Read both algorithm and config properties. @@ -113,6 +114,14 @@ public Config(Properties props) { } } + public void setNumThreads(int numThreads) { + this.numThreads = numThreads; + } + + public int getNumThreads() { + return numThreads; + } + @SuppressWarnings({"unchecked", "rawtypes"}) private void printProps() { System.out.println("------------> config properties:");