From 82b21690e85c9db05ff90d92f6966182fd275e17 Mon Sep 17 00:00:00 2001 From: Mamy Ratsimbazafy Date: Sun, 29 Dec 2019 02:11:20 +0100 Subject: [PATCH] Fix splitted tasks wakeup (#77) * Address #76, send enough loop tasks to woken up children * Fix idle subtree worker count * Fix sending too much tasks to subtree * in the worksharing case we shouldn't add one to the number of thieves estimation, it's already done in the proxy proc * loadBalance on loop tasks entry to address https://github.com/mratsim/weave/issues/76#issuecomment-569428292 * Fix sending the next iteration to children --- weave/config.nim | 6 +-- weave/loop_splitting.nim | 43 +++++++++++++------ weave/memory/memory_pools.nim | 2 +- weave/parallel_for.nim | 1 + weave/parallel_for_staged.nim | 1 + weave/parallel_reduce.nim | 1 + weave/scheduler.nim | 2 +- weave/scheduler_fsm.nim | 2 +- weave/victims.nim | 80 ++++++++++++++++++++++++----------- weave/workers.nim | 2 +- 10 files changed, 96 insertions(+), 44 deletions(-) diff --git a/weave/config.nim b/weave/config.nim index b0ff7a4..d51f73e 100644 --- a/weave/config.nim +++ b/weave/config.nim @@ -94,15 +94,15 @@ const template metrics*(body: untyped): untyped = when defined(WV_Metrics): - {.noSideEffect, gcsafe.}: body + block: {.noSideEffect, gcsafe.}: body template debugTermination*(body: untyped): untyped = when defined(WV_DebugTermination) or defined(WV_Debug): - {.noSideEffect, gcsafe.}: body + block: {.noSideEffect, gcsafe.}: body template debug*(body: untyped): untyped = when defined(WV_Debug): - {.noSideEffect, gcsafe.}: body + block: {.noSideEffect, gcsafe.}: body template StealAdaptative*(body: untyped): untyped = when StealStrategy == StealKind.adaptative: diff --git a/weave/loop_splitting.nim b/weave/loop_splitting.nim index b42d9dc..38987db 100644 --- a/weave/loop_splitting.nim +++ b/weave/loop_splitting.nim @@ -19,15 +19,21 @@ import # - Otherwise they stay on the worker optimizing cache reuse # and minimizing useless scheduler overhead -func splitHalf(task: Task): int {.inline.} = +func splitHalf*(task: Task): int {.inline.} = ## Split loop iteration range in half task.cur + ((task.stop - task.cur + task.stride-1) div task.stride) shr 1 -func roundPrevMultipleOf(x: SomeInteger, step: SomeInteger): SomeInteger {.inline.} = +func roundPrevMultipleOf(x, step: SomeInteger): SomeInteger {.inline.} = ## Round the input to the previous multiple of "step" result = x - x mod step -func splitGuided(task: Task): int {.inline.} = +func roundNextMultipleOf(x, step: SomeInteger): SomeInteger {.inline.} = + ## Round the input to the next multiple + ## Note: roundNextMultipleOf(0, 10) == 10 + ## which is desired as we don't want to return our last iteration + x + step - 1 - (x-1) mod step + +func splitGuided*(task: Task): int {.inline.} = ## Split iteration range based on the number of workers let stepsLeft = (task.stop - task.cur + task.stride-1) div task.stride preCondition: stepsLeft > 0 @@ -39,7 +45,7 @@ func splitGuided(task: Task): int {.inline.} = return task.splitHalf() return roundPrevMultipleOf(task.stop - chunk*task.stride, task.stride) -func splitAdaptative(task: Task, approxNumThieves: int32): int {.inline.} = +func splitAdaptative*(task: Task, approxNumThieves: int32): int {.inline.} = ## Split iteration range based on the number of steal requests let stepsLeft = (task.stop - task.cur + task.stride-1) div task.stride preCondition: stepsLeft > 1 @@ -56,15 +62,26 @@ func splitAdaptative(task: Task, approxNumThieves: int32): int {.inline.} = result = roundPrevMultipleOf(task.stop - chunk*task.stride, task.stride) -template split*(task: Task, approxNumThieves: int32): int = - when SplitStrategy == SplitKind.half: - splitHalf(task) - elif SplitStrategy == guided: - splitGuided(task) - elif SplitStrategy == SplitKind.adaptative: - splitAdaptative(task, approxNumThieves) - else: - {.error: "Unreachable".} +func splitAdaptativeDelegated*(task: Task, approxNumThieves, delegateNumThieves: int32): int {.inline.} = + ## Split iteration range based on the number of steal requests + ## When a child subtree needs to be woken up, we need to send enough tasks + ## for its whole trees + all pending steal requests. + let stepsLeft = (task.stop - task.cur + task.stride-1) div task.stride + preCondition: stepsLeft > 1 + preCondition: delegateNumThieves in 1 .. approxNumThieves + + debug: + log("Worker %2d: %ld steps left (start: %d, current: %d, stop: %d, stride: %d, %d thieves)\n", + myID(), stepsLeft, task.start, task.cur, task.stop, task.stride, approxNumThieves) + + # Send a chunk of work to all + let chunk = max(stepsLeft div (approxNumThieves + 1), 1) + ascertain: stepsLeft > chunk + + let workPackage = delegateNumThieves*chunk*task.stride + let nextIter = task.cur + task.stride + result = max(nextIter, roundNextMultipleOf(task.stop - workPackage, task.stride)) + postCondition: result in nextIter ..< task.stop template isSplittable*(t: Task): bool = not t.isNil and t.isLoop and (t.stop - t.cur + t.stride-1) div t.stride > 1 diff --git a/weave/memory/memory_pools.nim b/weave/memory/memory_pools.nim index a424319..110b66e 100644 --- a/weave/memory/memory_pools.nim +++ b/weave/memory/memory_pools.nim @@ -58,7 +58,7 @@ static: assert WV_MemBlockSize >= 256, "WV_MemBlockSize must be greater or equal template debugMem*(body: untyped) = when defined(WV_debugMem): - {.noSideEffect.}: + block: {.noSideEffect.}: body # Memory Pool types diff --git a/weave/parallel_for.nim b/weave/parallel_for.nim index f2ef228..b46730c 100644 --- a/weave/parallel_for.nim +++ b/weave/parallel_for.nim @@ -32,6 +32,7 @@ template parallelForWrapper( ## Loop prologue, epilogue, ## remoteAccum, resultTy and returnStmt ## are unused + loadBalance(Weave) block: let this = myTask() diff --git a/weave/parallel_for_staged.nim b/weave/parallel_for_staged.nim index 641e5cb..54b37cf 100644 --- a/weave/parallel_for_staged.nim +++ b/weave/parallel_for_staged.nim @@ -29,6 +29,7 @@ template parallelStagedWrapper( ## Also poll steal requests in-between iterations ## ## remoteAccum and resultFlowvarType are unused + loadBalance(Weave) prologue diff --git a/weave/parallel_reduce.nim b/weave/parallel_reduce.nim index 36ea5ed..d214629 100644 --- a/weave/parallel_reduce.nim +++ b/weave/parallel_reduce.nim @@ -28,6 +28,7 @@ template parallelReduceWrapper( ## To be called within a loop task ## Gets the loop bounds and iterate the over them ## Also poll steal requests in-between iterations + loadBalance(Weave) prologue diff --git a/weave/scheduler.nim b/weave/scheduler.nim index 3ee89a6..c21ba36 100644 --- a/weave/scheduler.nim +++ b/weave/scheduler.nim @@ -270,7 +270,7 @@ proc worker_entry_fn*(id: WorkerID) {.gcsafe.} = proc schedule*(task: sink Task) = ## Add a new task to be scheduled in parallel preCondition: not task.fn.isNil - debug: log("Worker %2d: scheduling task.fn 0x%.08x\n", myID(), task.fn) + debug: log("Worker %2d: scheduling task.fn 0x%.08x (%d pending)\n", myID(), task.fn, myWorker().deque.pendingTasks) myWorker().deque.addFirst task diff --git a/weave/scheduler_fsm.nim b/weave/scheduler_fsm.nim index 7a54d0b..daf88c2 100644 --- a/weave/scheduler_fsm.nim +++ b/weave/scheduler_fsm.nim @@ -87,7 +87,7 @@ behavior(handleThievesFSA): behavior(handleThievesFSA): ini: IT_CanSplit - transition: splitAndSend(poppedTask, req) + transition: splitAndSend(poppedTask, req, workSharing = false) fin: IT_CheckTheft # ------------------------------------------- diff --git a/weave/victims.nim b/weave/victims.nim index 07d415f..fce8d42 100644 --- a/weave/victims.nim +++ b/weave/victims.nim @@ -46,24 +46,20 @@ Backoff: # Victims - Adaptative task splitting # ---------------------------------------------------------------------------------- -proc approxNumThieves(): int32 {.inline.} = - # We estimate the number of idle workers by counting the number of theft attempts - # Notes: - # - We peek into a MPSC channel from the consumer thread: the peek is a lower bound - # as more requests may pile up concurrently. - # - We already read 1 steal request before trying to split so need to add it back. - # - Workers may send steal requests before actually running out-of-work - result = 1 + myThieves().peek() - debug: log("Worker %2d: has %ld steal requests\n", myID(), result) - Backoff: proc approxNumThievesProxy(worker: WorkerID): int32 = # Estimate the number of idle workers of a worker subtree - if worker == Not_a_worker: return 0 + if worker == Not_a_worker: + return 1 # The child worker is also idling result = 0 + var count = 0'i32 for w in traverseBreadthFirst(worker, maxID()): result += getThievesOf(w).peek() - debug: log("Worker %2d: found %ld steal requests addressed to its child %d and grandchildren\n", myID(), result, worker) + count += 1 + debug: log("Worker %2d: found %ld steal requests addressed to its child %d and grandchildren (%d workers) \n", myID(), result, worker, count) + # When approximating the number of thieves we need to take the + # work-sharing requests into account, i.e. 1 task per child + result += count # Victims - Steal requests handling # ---------------------------------------------------------------------------------- @@ -195,7 +191,45 @@ proc dispatchElseDecline*(req: sink StealRequest) {.gcsafe.}= ascertain: myWorker().deque.isEmpty() decline(req) -proc splitAndSend*(task: Task, req: sink StealRequest) = +proc evalSplit(task: Task, req: StealRequest, workSharing: bool): int {.inline.}= + when SplitStrategy == SplitKind.half: + return splitHalf(task) + elif SplitStrategy == guided: + return splitGuided(task) + elif SplitStrategy == SplitKind.adaptative: + var guessThieves = myThieves().peek() + debug: log("Worker %2d: has %ld steal requests queued\n", myID(), guessThieves) + Backoff: + if workSharing: + # The real splitting will be done by the child worker + # We need to send it enough work for its own children and all the steal requests pending + ascertain: req.thiefID in {myWorker().left, myWorker().right} + var left, right = 0'i32 + if myWorker().leftIsWaiting: + left = approxNumThievesProxy(myWorker().left) + if myWorker().rightIsWaiting: + right = approxNumThievesProxy(myWorker().right) + guessThieves += left + right + debug: + log("Worker %2d: workSharing split, thiefID %d, total subtree thieves %d, left{id: %d, waiting: %d, requests: %d}, right{id: %d, waiting: %d, requests: %d}\n", + myID(), req.thiefID, guessThieves, myWorker().left, myWorker().leftIsWaiting, left, myWorker().right, myWorker().rightIsWaiting, right + ) + if req.thiefID == myWorker().left: + return splitAdaptativeDelegated(task, guessThieves, left) + else: + return splitAdaptativeDelegated(task, guessThieves, right) + # ------------------------------------------------------------ + if myWorker().leftIsWaiting: + guessThieves += approxNumThievesProxy(myWorker().left) + if myWorker().rightIsWaiting: + guessThieves += approxNumThievesProxy(myWorker().right) + # If not "workSharing" we also just dequeued the steal request currently being considered + # so we need to add it back + return splitAdaptative(task, 1+guessThieves) + else: + {.error: "Unreachable".} + +proc splitAndSend*(task: Task, req: sink StealRequest, workSharing: bool) = ## Split a task and send a part to the thief preCondition: req.thiefID != myID() @@ -209,13 +243,7 @@ proc splitAndSend*(task: Task, req: sink StealRequest) = # Split iteration range according to given strategy # [start, stop) => [start, split) + [split, end) - var guessThieves = approxNumThieves() - Backoff: - if myWorker().leftIsWaiting: - guessThieves += approxNumThievesProxy(myWorker().left) - if myWorker().rightIsWaiting: - guessThieves += approxNumThievesProxy(myWorker().right) - let split = split(task, guessThieves) + let split = evalSplit(task, req, workSharing) # New task gets the upper half upperSplit.start = split @@ -225,7 +253,9 @@ proc splitAndSend*(task: Task, req: sink StealRequest) = # Current task continues with lower half task.stop = split - debug: log("Worker %2d: Sending [%ld, %ld) to worker %d\n", myID(), upperSplit.start, upperSplit.stop, req.thiefID) + debug: + let steps = (upperSplit.stop-upperSplit.start + upperSplit.stride-1) div upperSplit.stride + log("Worker %2d: Sending [%ld, %ld) to worker %d (%d steps)\n", myID(), upperSplit.start, upperSplit.stop, req.thiefID, steps) profile(send_recv_task): if upperSplit.hasFuture: @@ -244,10 +274,12 @@ proc splitAndSend*(task: Task, req: sink StealRequest) = req.send(upperSplit) incCounter(tasksSplit) - debug: log("Worker %2d: Continuing with [%ld, %ld)\n", myID(), task.cur, task.stop) + debug: + let steps = (task.stop-task.cur + task.stride-1) div task.stride + log("Worker %2d: Continuing with [%ld, %ld) (%d steps)\n", myID(), task.cur, task.stop, steps) proc distributeWork(req: sink StealRequest): bool = - ## Handle incoming steal request + ## Handle children work sharing requests ## Returns true if we found work ## false otherwise @@ -263,7 +295,7 @@ proc distributeWork(req: sink StealRequest): bool = # Otherwise try to split the current one if myTask().isSplittable(): if req.thiefID != myID(): - myTask().splitAndSend(req) + myTask().splitAndSend(req, workSharing = true) return true else: req.forget() diff --git a/weave/workers.nim b/weave/workers.nim index 16174b0..1aea40e 100644 --- a/weave/workers.nim +++ b/weave/workers.nim @@ -36,7 +36,7 @@ proc runTask*(task: Task) {.inline, gcsafe.} = # TODO - logic seems sketchy, why do we do this <-> task. let this = myTask() myTask() = task - debug: log("Worker %2d: running task.fn 0x%.08x\n", myID(), task.fn) + debug: log("Worker %2d: running task.fn 0x%.08x (%d pending)\n", myID(), task.fn, myWorker().deque.pendingTasks) task.fn(task.data.addr) myTask() = this if task.isLoop: