-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Can workers begin processing concurrently as the master uploads jobs? #39
Comments
Looking at the source it looks like currently the answer is no. https://github.com/bwlewis/doRedis/blob/master/R/doRedis.R#L492 So in addition to the question of feasibility, am I misunderstanding this as one of the major features of iterators? If all work has to be queued before it can be operated on how do we get the memory savings associated with iterating over data in a "just in time" manner? Thanks for the help! |
This is a good question and an astute observation. I see right now the code you refer to: # use nonblocking call to submit all tasks at once
redisSetPipeline(TRUE)
redisMulti()
while(j <= ntasks)
{
k <- min(j + chunkSize, ntasks)
block <- argsList[j:k]
if(is.null(block)) break
if(!is.null(gather)) names(block) <- rep(nout, k - j + 1)
else names(block) <- j:k
blocknames <- c(blocknames, list(names(block)))
redisRPush(queue, list(ID=ID, argsList=block))
j <- k + 1
nout <- nout + 1
}
redisExec()
redisGetResponse(all=TRUE)
redisSetPipeline(FALSE) could be changed to, for example: while(j <= ntasks)
{
k <- min(j + chunkSize, ntasks)
block <- argsList[j:k]
if(is.null(block)) break
if(!is.null(gather)) names(block) <- rep(nout, k - j + 1)
else names(block) <- j:k
blocknames <- c(blocknames, list(names(block)))
redisRPush(queue, list(ID=ID, argsList=block))
j <- k + 1
nout <- nout + 1
} and then tasks would go into the queue one at a time and the workers will pull them immediately. For a job with thousands of tasks, this approach will take much longer. I guess I had that code in there for efficient submission of large jobs. The new TCP nodelay setting in the rredis package probably goes a long way to making this optimization not so important. So, this would be an interesting change to experiment with and see how well it works. Would you consider submitting a pull request if this change works well for you? |
I will definitely give it a try. But first, there is a lot I don't understand about that code so I want to clarify. Will we create a race condition between the workers and the uploaded work in which the job could end prematurely? Or will the workers wait for all the jobs, still, in the case of slow preprocessing. (Don't workers quit when the queue goes empty?) Also, does |
Workers don't quit when the queue goes empty, but they can be configured to exit after a fixed number of tasks. But I don't see a race condition. Recall that with doRedis, one can submit tasks without any workers around and the master will just wait until someone comes along and runs it (anonymous pull-based scheduling). That's it's main feature making it very elastic. But! you're right about the 2nd part. The That |
Would be nice to use |
Initially it appears that using Looking at the surrounding code (with a vague understanding) Could we also use |
Oops, I merged this but now I see that tests are failing. For instance R CMD build doRedis && TEST_DOREDIS=TRUE R CMD check doRedis_1.2.0.tar.gz fails with errors. I'll work on figuring out what's wrong tonight... In the meanwhile, I reverted this but moved the commit into the 'devel' branch. |
Great thanks for checking it for me. I will look into this as well. |
sorry I've been slow to investigate... lots going on
|
When using foreach and doRedis the doRedis workers wait until all jobs have reached the redis server before beginning processing. Is it possible to have them begin before all the preprocessing has finished?
I am using an iterator which is working great - preprocessing happens 'just in time' and the job data begins to hit the server as the iterator runs but before it has finished. I can't seem to take advantage of this behavior, though, because the workers just wait until all jobs have been uploaded before they start grabbing them from the queue.
Example code:
In this example complex.iter takes a while and there are many elements to iterate over. As such it would be great if workers started running process.function() before all the preprocessing is finished. Unfortunately they seem to wait until complex.iter has run on all elements.
I have set .inorder=F.
Any suggestions as to how to achieve this desired behavior? Thanks.
The text was updated successfully, but these errors were encountered: