Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/mratsim/weave
Browse files Browse the repository at this point in the history
  • Loading branch information
mratsim committed Dec 29, 2019
2 parents 5696d94 + 82b2169 commit 9e66261
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 44 deletions.
6 changes: 3 additions & 3 deletions weave/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ const

template metrics*(body: untyped): untyped =
when defined(WV_Metrics):
{.noSideEffect, gcsafe.}: body
block: {.noSideEffect, gcsafe.}: body

template debugTermination*(body: untyped): untyped =
when defined(WV_DebugTermination) or defined(WV_Debug):
{.noSideEffect, gcsafe.}: body
block: {.noSideEffect, gcsafe.}: body

template debug*(body: untyped): untyped =
when defined(WV_Debug):
{.noSideEffect, gcsafe.}: body
block: {.noSideEffect, gcsafe.}: body

template StealAdaptative*(body: untyped): untyped =
when StealStrategy == StealKind.adaptative:
Expand Down
43 changes: 30 additions & 13 deletions weave/loop_splitting.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@ import
# - Otherwise they stay on the worker optimizing cache reuse
# and minimizing useless scheduler overhead

func splitHalf(task: Task): int {.inline.} =
func splitHalf*(task: Task): int {.inline.} =
## Split loop iteration range in half
task.cur + ((task.stop - task.cur + task.stride-1) div task.stride) shr 1

func roundPrevMultipleOf(x: SomeInteger, step: SomeInteger): SomeInteger {.inline.} =
func roundPrevMultipleOf(x, step: SomeInteger): SomeInteger {.inline.} =
## Round the input to the previous multiple of "step"
result = x - x mod step

func splitGuided(task: Task): int {.inline.} =
func roundNextMultipleOf(x, step: SomeInteger): SomeInteger {.inline.} =
## Round the input to the next multiple
## Note: roundNextMultipleOf(0, 10) == 10
## which is desired as we don't want to return our last iteration
x + step - 1 - (x-1) mod step

func splitGuided*(task: Task): int {.inline.} =
## Split iteration range based on the number of workers
let stepsLeft = (task.stop - task.cur + task.stride-1) div task.stride
preCondition: stepsLeft > 0
Expand All @@ -39,7 +45,7 @@ func splitGuided(task: Task): int {.inline.} =
return task.splitHalf()
return roundPrevMultipleOf(task.stop - chunk*task.stride, task.stride)

func splitAdaptative(task: Task, approxNumThieves: int32): int {.inline.} =
func splitAdaptative*(task: Task, approxNumThieves: int32): int {.inline.} =
## Split iteration range based on the number of steal requests
let stepsLeft = (task.stop - task.cur + task.stride-1) div task.stride
preCondition: stepsLeft > 1
Expand All @@ -56,15 +62,26 @@ func splitAdaptative(task: Task, approxNumThieves: int32): int {.inline.} =

result = roundPrevMultipleOf(task.stop - chunk*task.stride, task.stride)

template split*(task: Task, approxNumThieves: int32): int =
when SplitStrategy == SplitKind.half:
splitHalf(task)
elif SplitStrategy == guided:
splitGuided(task)
elif SplitStrategy == SplitKind.adaptative:
splitAdaptative(task, approxNumThieves)
else:
{.error: "Unreachable".}
func splitAdaptativeDelegated*(task: Task, approxNumThieves, delegateNumThieves: int32): int {.inline.} =
## Split iteration range based on the number of steal requests
## When a child subtree needs to be woken up, we need to send enough tasks
## for its whole trees + all pending steal requests.
let stepsLeft = (task.stop - task.cur + task.stride-1) div task.stride
preCondition: stepsLeft > 1
preCondition: delegateNumThieves in 1 .. approxNumThieves

debug:
log("Worker %2d: %ld steps left (start: %d, current: %d, stop: %d, stride: %d, %d thieves)\n",
myID(), stepsLeft, task.start, task.cur, task.stop, task.stride, approxNumThieves)

# Send a chunk of work to all
let chunk = max(stepsLeft div (approxNumThieves + 1), 1)
ascertain: stepsLeft > chunk

let workPackage = delegateNumThieves*chunk*task.stride
let nextIter = task.cur + task.stride
result = max(nextIter, roundNextMultipleOf(task.stop - workPackage, task.stride))
postCondition: result in nextIter ..< task.stop

template isSplittable*(t: Task): bool =
not t.isNil and t.isLoop and (t.stop - t.cur + t.stride-1) div t.stride > 1
2 changes: 1 addition & 1 deletion weave/memory/memory_pools.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static: assert WV_MemBlockSize >= 256, "WV_MemBlockSize must be greater or equal

template debugMem*(body: untyped) =
when defined(WV_debugMem):
{.noSideEffect.}:
block: {.noSideEffect.}:
body

# Memory Pool types
Expand Down
1 change: 1 addition & 0 deletions weave/parallel_for.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ template parallelForWrapper(
## Loop prologue, epilogue,
## remoteAccum, resultTy and returnStmt
## are unused
loadBalance(Weave)

block:
let this = myTask()
Expand Down
1 change: 1 addition & 0 deletions weave/parallel_for_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ template parallelStagedWrapper(
## Also poll steal requests in-between iterations
##
## remoteAccum and resultFlowvarType are unused
loadBalance(Weave)

prologue

Expand Down
1 change: 1 addition & 0 deletions weave/parallel_reduce.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ template parallelReduceWrapper(
## To be called within a loop task
## Gets the loop bounds and iterate the over them
## Also poll steal requests in-between iterations
loadBalance(Weave)

prologue

Expand Down
2 changes: 1 addition & 1 deletion weave/scheduler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ proc worker_entry_fn*(id: WorkerID) {.gcsafe.} =
proc schedule*(task: sink Task) =
## Add a new task to be scheduled in parallel
preCondition: not task.fn.isNil
debug: log("Worker %2d: scheduling task.fn 0x%.08x\n", myID(), task.fn)
debug: log("Worker %2d: scheduling task.fn 0x%.08x (%d pending)\n", myID(), task.fn, myWorker().deque.pendingTasks)

myWorker().deque.addFirst task

Expand Down
2 changes: 1 addition & 1 deletion weave/scheduler_fsm.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ behavior(handleThievesFSA):

behavior(handleThievesFSA):
ini: IT_CanSplit
transition: splitAndSend(poppedTask, req)
transition: splitAndSend(poppedTask, req, workSharing = false)
fin: IT_CheckTheft

# -------------------------------------------
Expand Down
80 changes: 56 additions & 24 deletions weave/victims.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,20 @@ Backoff:
# Victims - Adaptative task splitting
# ----------------------------------------------------------------------------------

proc approxNumThieves(): int32 {.inline.} =
# We estimate the number of idle workers by counting the number of theft attempts
# Notes:
# - We peek into a MPSC channel from the consumer thread: the peek is a lower bound
# as more requests may pile up concurrently.
# - We already read 1 steal request before trying to split so need to add it back.
# - Workers may send steal requests before actually running out-of-work
result = 1 + myThieves().peek()
debug: log("Worker %2d: has %ld steal requests\n", myID(), result)

Backoff:
proc approxNumThievesProxy(worker: WorkerID): int32 =
# Estimate the number of idle workers of a worker subtree
if worker == Not_a_worker: return 0
if worker == Not_a_worker:
return 1 # The child worker is also idling
result = 0
var count = 0'i32
for w in traverseBreadthFirst(worker, maxID()):
result += getThievesOf(w).peek()
debug: log("Worker %2d: found %ld steal requests addressed to its child %d and grandchildren\n", myID(), result, worker)
count += 1
debug: log("Worker %2d: found %ld steal requests addressed to its child %d and grandchildren (%d workers) \n", myID(), result, worker, count)
# When approximating the number of thieves we need to take the
# work-sharing requests into account, i.e. 1 task per child
result += count

# Victims - Steal requests handling
# ----------------------------------------------------------------------------------
Expand Down Expand Up @@ -195,7 +191,45 @@ proc dispatchElseDecline*(req: sink StealRequest) {.gcsafe.}=
ascertain: myWorker().deque.isEmpty()
decline(req)

proc splitAndSend*(task: Task, req: sink StealRequest) =
proc evalSplit(task: Task, req: StealRequest, workSharing: bool): int {.inline.}=
when SplitStrategy == SplitKind.half:
return splitHalf(task)
elif SplitStrategy == guided:
return splitGuided(task)
elif SplitStrategy == SplitKind.adaptative:
var guessThieves = myThieves().peek()
debug: log("Worker %2d: has %ld steal requests queued\n", myID(), guessThieves)
Backoff:
if workSharing:
# The real splitting will be done by the child worker
# We need to send it enough work for its own children and all the steal requests pending
ascertain: req.thiefID in {myWorker().left, myWorker().right}
var left, right = 0'i32
if myWorker().leftIsWaiting:
left = approxNumThievesProxy(myWorker().left)
if myWorker().rightIsWaiting:
right = approxNumThievesProxy(myWorker().right)
guessThieves += left + right
debug:
log("Worker %2d: workSharing split, thiefID %d, total subtree thieves %d, left{id: %d, waiting: %d, requests: %d}, right{id: %d, waiting: %d, requests: %d}\n",
myID(), req.thiefID, guessThieves, myWorker().left, myWorker().leftIsWaiting, left, myWorker().right, myWorker().rightIsWaiting, right
)
if req.thiefID == myWorker().left:
return splitAdaptativeDelegated(task, guessThieves, left)
else:
return splitAdaptativeDelegated(task, guessThieves, right)
# ------------------------------------------------------------
if myWorker().leftIsWaiting:
guessThieves += approxNumThievesProxy(myWorker().left)
if myWorker().rightIsWaiting:
guessThieves += approxNumThievesProxy(myWorker().right)
# If not "workSharing" we also just dequeued the steal request currently being considered
# so we need to add it back
return splitAdaptative(task, 1+guessThieves)
else:
{.error: "Unreachable".}

proc splitAndSend*(task: Task, req: sink StealRequest, workSharing: bool) =
## Split a task and send a part to the thief
preCondition: req.thiefID != myID()

Expand All @@ -209,13 +243,7 @@ proc splitAndSend*(task: Task, req: sink StealRequest) =

# Split iteration range according to given strategy
# [start, stop) => [start, split) + [split, end)
var guessThieves = approxNumThieves()
Backoff:
if myWorker().leftIsWaiting:
guessThieves += approxNumThievesProxy(myWorker().left)
if myWorker().rightIsWaiting:
guessThieves += approxNumThievesProxy(myWorker().right)
let split = split(task, guessThieves)
let split = evalSplit(task, req, workSharing)

# New task gets the upper half
upperSplit.start = split
Expand All @@ -225,7 +253,9 @@ proc splitAndSend*(task: Task, req: sink StealRequest) =
# Current task continues with lower half
task.stop = split

debug: log("Worker %2d: Sending [%ld, %ld) to worker %d\n", myID(), upperSplit.start, upperSplit.stop, req.thiefID)
debug:
let steps = (upperSplit.stop-upperSplit.start + upperSplit.stride-1) div upperSplit.stride
log("Worker %2d: Sending [%ld, %ld) to worker %d (%d steps)\n", myID(), upperSplit.start, upperSplit.stop, req.thiefID, steps)

profile(send_recv_task):
if upperSplit.hasFuture:
Expand All @@ -244,10 +274,12 @@ proc splitAndSend*(task: Task, req: sink StealRequest) =
req.send(upperSplit)

incCounter(tasksSplit)
debug: log("Worker %2d: Continuing with [%ld, %ld)\n", myID(), task.cur, task.stop)
debug:
let steps = (task.stop-task.cur + task.stride-1) div task.stride
log("Worker %2d: Continuing with [%ld, %ld) (%d steps)\n", myID(), task.cur, task.stop, steps)

proc distributeWork(req: sink StealRequest): bool =
## Handle incoming steal request
## Handle children work sharing requests
## Returns true if we found work
## false otherwise

Expand All @@ -263,7 +295,7 @@ proc distributeWork(req: sink StealRequest): bool =
# Otherwise try to split the current one
if myTask().isSplittable():
if req.thiefID != myID():
myTask().splitAndSend(req)
myTask().splitAndSend(req, workSharing = true)
return true
else:
req.forget()
Expand Down
2 changes: 1 addition & 1 deletion weave/workers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ proc runTask*(task: Task) {.inline, gcsafe.} =
# TODO - logic seems sketchy, why do we do this <-> task.
let this = myTask()
myTask() = task
debug: log("Worker %2d: running task.fn 0x%.08x\n", myID(), task.fn)
debug: log("Worker %2d: running task.fn 0x%.08x (%d pending)\n", myID(), task.fn, myWorker().deque.pendingTasks)
task.fn(task.data.addr)
myTask() = this
if task.isLoop:
Expand Down

0 comments on commit 9e66261

Please sign in to comment.