-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel processing #132
Parallel processing #132
Conversation
…pletes the processing
@mikemccand Thanks & Regards, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks close! I'm a little worried about the corner case when number of threads exceeds number of documents.
Can you share what speedup you saw on what kind of concurrent computer with this versus mainline?
} | ||
|
||
// Getting file index value which is set for each thread | ||
int index = Integer.parseInt(Thread.currentThread().getName().substring(12)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is TaskSequence
/ ParallelTask
the only place where new Thread
s are created in benchmarks?
Could you add a comment here pointing to ParallelTask.java
explaining that we named/numbered the threads carefully, and that's why this parsing to int
is safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-Yes, TaskSequence.java is only where new Index threads are created.
-We want to ensure that the name of Index threads maintains a guaranteed sequence and we explicitly setup thread names in TaskSequence.java. The thread name maintains "IndexThread-" pattern where is an integer. So, it is safe to parse the thread name to int.
We'll also add necessary comments in ReutersContentSource.java as well.
|
||
// Getting file index value which is set for each thread | ||
int index = Integer.parseInt(Thread.currentThread().getName().substring(12)); | ||
int fIndex = index + threadIndex[index] * threadIndex.length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add assert index >= 0 && index < threadIndex.length
above this? This way if there is some thread naming bug, and assertions are enabled, we hit AssertionError
before AIOOBE
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. We'll incorporate this check.
} | ||
|
||
// Check if this thread has exhausted its files | ||
if (fIndex >= inputFilesSize) { | ||
threadIndex[index] = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, in the case where number-of-threads is bigger than number-of-input-files, aren't we (always) setting the wrong index
back to 0
here? Does that matter? Maybe add a dedicated test case so this new code is exercised?
Thanks for the updates! It looks like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is really close! I left a few small comments. Thanks @balmukundblr!
int inFileSize = inputFiles.size(); | ||
|
||
//Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads | ||
int fileIndex = stride % inFileSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm do we already guard for the (degenerate) case of inFileSize == 0
? If not can we add some protection here, e.g. maybe throw a clear exception that there is nothing to index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mike, its already handling in ReutersContentSource.java's setConfig(). Please find the code snippet for the same.
if (inputFiles.size() == 0) {
throw new RuntimeException("No txt files in dataDir: "+dataDir.toAbsolutePath());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry Mike, i forgot to mention that i've tested with inFileSize == 0 and it throws expected exception.
t[index++] = new ParallelTask(task); | ||
t[index] = new ParallelTask(task); | ||
//Setting unique ThreadName with index value which is used in ReuersContentSource.java's getNextDocData() | ||
t[index].setName("IndexThread-" + index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, parallel tasks might be running queries too right? Maybe we should pick a more generic name? Maybe ParallelTaskThread-N
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Mike, did the required changes.
// prepare threads | ||
int index = 0; | ||
for (int k = 0; k < repetitions; k++) { | ||
for (int i = 0; i < tasksArray.length; i++) { | ||
final PerfTask task = tasksArray[i].clone(); | ||
t[index++] = new ParallelTask(task); | ||
t[index] = new ParallelTask(task); | ||
//Setting unique ThreadName with index value which is used in ReuersContentSource.java's getNextDocData() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you strengthen the comment to state that we should NOT change this thread name, unless we also fix the String -> int parsing logic in ReutersContentSource
?
Actually, could we factor out this string part of the thread name into a static final String
constant, e.g.static final String PARALLEL_TASK_THREAD_NAME_PREFIX = "ParallelTaskThread";
, and reference that constant from both places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorporated the required changes through adding it in Constants.java file and referred from both places.
|
||
//Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads | ||
int fileIndex = stride % inFileSize; | ||
int iteration = stride / inFileSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for improving this logic -- much easier to understand now!
public int doLogic() throws Exception { | ||
IndexWriter iw = getRunData().getIndexWriter(); | ||
if (iw != null) { | ||
iw.flushNextBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flushes one thread; not all. I'm honestly not sure what the use-case is of that method. Did you mean to call iw.flush()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for delay response. We observed that post- processing was taking longer time because it was singly threaded. Also, it was depending upon the per-thread indexed data. Hence, we are explicitly flushing the per thread data as soon as it finishes the indexing process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this task is optional and can be used purely on need basis.
@mikemccand |
@mikemccand |
Egads! I don't know why it's stuck in Pending. We can skip it -- I'll confirm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @balmukundblr this is a nice improvement in concurrency. I'll try to push soon!
Thanks @balmukundblr -- I merged this with Lucene's main branch and also backported to 8.x (for eventual future 8.10.0 release). |
@mikemccand |
Description
Description
Please note- This is not a new PR- Original PR (apache/lucene-solr#2345) was raised on old apache/lucene-solr github repository. This is just a copy in new repo.
Lucene Benchmark Scaling Problem with Reuters Corpus
While Indexing 1 million documents with reuters21578 (plain text Document derived from reuters21578 corpus), we observed that with higher number of Index threads, the Index throughput does not scale and degrades. Existing implementation with synchronization block allows only one thread to pick up a document/file from list, at any given time – this code is part of getNextDocData() in ReutersContentSource.java. With multiple index threads, this becomes a thread contention bottleneck and does not allow the system CPU resource to be used efficiently.
Solution
We developed a strategy to distribute total number of files across multiple number of Indexing threads, so that these threads work independently and parallelly.
Tests
We mainly modified existing getNextDocData(), which is not altering functionality, hence not added any new test cases.
Passed existing tests
Checklist
Please review the following and check all that apply:
main
branch../gradlew check
.