Skip to content

Commit

Permalink
Parallel processing (#132)
Browse files Browse the repository at this point in the history
* Added a explicit Flush Task to flush data at Thread level once it completes the processing

* Included explicit flush per Thread level

* Done changes for parallel processing

* Removed extra brace

* Removed unused variable

* Removed unused variable initialization

* Did the required formating

* Refactored the code and added required comments & checks
  • Loading branch information
balmukundblr authored Jun 24, 2021
1 parent db26215 commit f1d54f7
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ public class Constants {
public static Boolean[] BOOLEANS = new Boolean[] {Boolean.FALSE, Boolean.TRUE};

public static final int DEFAULT_MAXIMUM_DOCUMENTS = Integer.MAX_VALUE;

public static final String PARALLEL_TASK_THREAD_NAME_PREFIX = "ParallelTaskThread";
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.Locale;
import org.apache.lucene.benchmark.Constants;
import org.apache.lucene.benchmark.byTask.utils.Config;

/**
Expand All @@ -50,8 +51,8 @@ private static final class DateFormatInfo {
private ThreadLocal<DateFormatInfo> dateFormat = new ThreadLocal<>();
private Path dataDir = null;
private ArrayList<Path> inputFiles = new ArrayList<>();
private int nextFile = 0;
private int iteration = 0;
private int[] docCountArr;
private volatile boolean docCountArrCreated;

@Override
public void setConfig(Config config) {
Expand Down Expand Up @@ -100,21 +101,35 @@ public void close() throws IOException {

@Override
public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException {
Path f = null;
String name = null;
synchronized (this) {
if (nextFile >= inputFiles.size()) {
// exhausted files, start a new round, unless forever set to false.
if (!forever) {
throw new NoMoreDataException();
}
nextFile = 0;
iteration++;
}
f = inputFiles.get(nextFile++);
name = f.toRealPath() + "_" + iteration;
if (docCountArrCreated == false) {
docCountArrInit();
}

int threadIndexSize = Thread.currentThread().getName().length();
int parallelTaskThreadSize = Constants.PARALLEL_TASK_THREAD_NAME_PREFIX.length();

// Extract ThreadIndex from unique ThreadName which is set with '"ParallelTaskThread-"+index',
// in TaskSequence.java's doParallelTasks()
int threadIndex =
Integer.parseInt(
Thread.currentThread()
.getName()
.substring(parallelTaskThreadSize + 1, threadIndexSize));

assert (threadIndex >= 0 && threadIndex < docCountArr.length)
: "Please check threadIndex or docCountArr length";
int stride = threadIndex + docCountArr[threadIndex] * docCountArr.length;
int inFileSize = inputFiles.size();

// Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of
// Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads
int fileIndex = stride % inFileSize;
int iteration = stride / inFileSize;
docCountArr[threadIndex]++;

Path f = inputFiles.get(fileIndex);
String name = f.toRealPath() + "_" + iteration;

try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) {
// First line is the date, 3rd is the title, rest is body
String dateStr = reader.readLine();
Expand Down Expand Up @@ -143,7 +158,12 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc
@Override
public synchronized void resetInputs() throws IOException {
super.resetInputs();
nextFile = 0;
iteration = 0;
}

private synchronized void docCountArrInit() {
if (docCountArrCreated == false) {
docCountArr = new int[getConfig().getNumThreads()];
docCountArrCreated = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.apache.lucene.benchmark.Constants;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
import org.apache.lucene.benchmark.byTask.stats.TaskStats;
Expand Down Expand Up @@ -340,12 +341,23 @@ private int doParallelTasks() throws Exception {

initTasksArray();
ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
// Get number of parallel threads from algo file and set it to use in ReuersContentSource.java's
// docCountArrInit()
this.getRunData().getConfig().setNumThreads(t.length);
// prepare threads
int index = 0;
for (int k = 0; k < repetitions; k++) {
for (int i = 0; i < tasksArray.length; i++) {
final PerfTask task = tasksArray[i].clone();
t[index++] = new ParallelTask(task);
t[index] = new ParallelTask(task);
// Setting unique ThreadName with index value which is used in ReuersContentSource.java's
// getNextDocData().Please make changes
// in ReuersContentSource.java's getNextDocData() for
// Integer.parseInt(Thread.currentThread().getName().substring(parallelTaskThreadSize + 1,
// threadIndexSize))
// before making any modifications here
t[index].setName(Constants.PARALLEL_TASK_THREAD_NAME_PREFIX + "-" + index);
index++;
}
}
// run threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class Config {
private HashMap<String, Object> valByRound = new HashMap<>();
private HashMap<String, String> colForValByRound = new HashMap<>();
private String algorithmText;
private int numThreads = 1;

/**
* Read both algorithm and config properties.
Expand Down Expand Up @@ -113,6 +114,14 @@ public Config(Properties props) {
}
}

public void setNumThreads(int numThreads) {
this.numThreads = numThreads;
}

public int getNumThreads() {
return numThreads;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private void printProps() {
System.out.println("------------> config properties:");
Expand Down

0 comments on commit f1d54f7

Please sign in to comment.