Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel processing #132

Merged
merged 13 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ log.queries=true

{ "Populate"
CreateIndex
[{ "MAddDocs" AddDoc } : 5000] : 4
[{ {{"MAddDocs" AddDoc } : 5000} FlushIndex } ] : 8
ForceMerge(1)
CloseIndex
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ private static final class DateFormatInfo {
private ThreadLocal<DateFormatInfo> dateFormat = new ThreadLocal<>();
private Path dataDir = null;
private ArrayList<Path> inputFiles = new ArrayList<>();
private int nextFile = 0;
private int iteration = 0;
private int[] threadIndex;
private volatile boolean threadIndexCreated;

@Override
public void setConfig(Config config) {
Expand Down Expand Up @@ -102,19 +103,33 @@ public void close() throws IOException {
public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException {
Path f = null;
String name = null;
synchronized (this) {
if (nextFile >= inputFiles.size()) {
// exhausted files, start a new round, unless forever set to false.
if (!forever) {
throw new NoMoreDataException();
}
nextFile = 0;
iteration++;
}
f = inputFiles.get(nextFile++);
name = f.toRealPath() + "_" + iteration;
int inputFilesSize = inputFiles.size();

if (threadIndexCreated == false) {
createThreadIndex();
}

// Getting file index value which is set for each thread
int index = Integer.parseInt(Thread.currentThread().getName().substring(12));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is TaskSequence / ParallelTask the only place where new Threads are created in benchmarks?

Could you add a comment here pointing to ParallelTask.java explaining that we named/numbered the threads carefully, and that's why this parsing to int is safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-Yes, TaskSequence.java is only where new Index threads are created.

-We want to ensure that the name of Index threads maintains a guaranteed sequence and we explicitly setup thread names in TaskSequence.java. The thread name maintains "IndexThread-" pattern where is an integer. So, it is safe to parse the thread name to int.
We'll also add necessary comments in ReutersContentSource.java as well.

int fIndex = index + threadIndex[index] * threadIndex.length;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add assert index >= 0 && index < threadIndex.length above this? This way if there is some thread naming bug, and assertions are enabled, we hit AssertionError before AIOOBE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. We'll incorporate this check.

threadIndex[index]++;

// Sanity check, if # threads is greater than # input files, wrap index
if (index >= inputFilesSize) {
index %= inputFilesSize;
}

// Check if this thread has exhausted its files
if (fIndex >= inputFilesSize) {
threadIndex[index] = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in the case where number-of-threads is bigger than number-of-input-files, aren't we (always) setting the wrong index back to 0 here? Does that matter? Maybe add a dedicated test case so this new code is exercised?

fIndex = index + threadIndex[index] * threadIndex.length;
threadIndex[index]++;
iteration++;
}

f = inputFiles.get(fIndex);
name = f.toRealPath() + "_" + iteration;

try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) {
// First line is the date, 3rd is the title, rest is body
String dateStr = reader.readLine();
Expand Down Expand Up @@ -143,7 +158,13 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc
@Override
public synchronized void resetInputs() throws IOException {
super.resetInputs();
nextFile = 0;
iteration = 0;
}

private synchronized void createThreadIndex() {
if (threadIndexCreated == false) {
threadIndex = new int[getConfig().getNumThreads()];
threadIndexCreated = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.benchmark.byTask.tasks;

import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.index.IndexWriter;

/** Flush Index Task uses flushNextBuffer() to flush documents at thread level */
public class FlushIndexTask extends PerfTask {

public FlushIndexTask(PerfRunData runData) {
super(runData);
}

@Override
public int doLogic() throws Exception {
IndexWriter iw = getRunData().getIndexWriter();
if (iw != null) {
iw.flushNextBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flushes one thread; not all. I'm honestly not sure what the use-case is of that method. Did you mean to call iw.flush()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for delay response. We observed that post- processing was taking longer time because it was singly threaded. Also, it was depending upon the per-thread indexed data. Hence, we are explicitly flushing the per thread data as soon as it finishes the indexing process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this task is optional and can be used purely on need basis.

}
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,16 @@ private int doParallelTasks() throws Exception {

initTasksArray();
ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
this.getRunData().getConfig().setNumThreads(t.length);
// prepare threads
int index = 0;
for (int k = 0; k < repetitions; k++) {
for (int i = 0; i < tasksArray.length; i++) {
final PerfTask task = tasksArray[i].clone();
t[index++] = new ParallelTask(task);
t[index] = new ParallelTask(task);
// Set the thread name for guaranteed file index while processing.
t[index].setName("IndexThread-" + index);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, parallel tasks might be running queries too right? Maybe we should pick a more generic name? Maybe ParallelTaskThread-N?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Mike, did the required changes.

index++;
}
}
// run threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class Config {
private HashMap<String, Object> valByRound = new HashMap<>();
private HashMap<String, String> colForValByRound = new HashMap<>();
private String algorithmText;
private int numThreads = 1;

/**
* Read both algorithm and config properties.
Expand Down Expand Up @@ -113,6 +114,14 @@ public Config(Properties props) {
}
}

public void setNumThreads(int numThreads) {
this.numThreads = numThreads;
}

public int getNumThreads() {
return numThreads;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private void printProps() {
System.out.println("------------> config properties:");
Expand Down