Skip to content

Commit

Permalink
Fix uninitialized pointer for tasks with multiple dependencies (#109)
Browse files Browse the repository at this point in the history
* Try to isolate the recurrent nil pledge bug on Travis MacOS

* Remove some weird delaying logic

* Try to sense if Travis bug is linked to #97

* more verbosity

* nextDep wasn't properly zero-initialized

* remove a repr

* missed another "nil"

* Cleanup investigative changes
  • Loading branch information
mratsim authored Apr 4, 2020
1 parent d93fce2 commit 8a0f744
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 19 deletions.
55 changes: 36 additions & 19 deletions weave/channels/pledges.nim
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ proc `=`*(dst: var Pledge, src: Pledge) {.inline.} =
proc delayedUntilSingle(taskNode: TaskNode, curTask: Task): bool =
## Redelay a task that depends on multiple pledges
## with 1 or more pledge fulfilled but still some unfulfilled.
## field is a place holder for impl / impls[bucket]
##
## Returns `true` if the task has been delayed.
## The task should not be accessed anymore by the current worker.
## Returns `false` if the task can be scheduled right away by the current worker thread.
preCondition: not taskNode.pledge.p.isNil

if taskNode.pledge.p.impl.fulfilled.load(moRelaxed):
Expand All @@ -230,17 +233,17 @@ proc delayedUntilSingle(taskNode: TaskNode, curTask: Task): bool =
return false

# Send the task to the pledge fulfiller
taskNode.task = curTask
let pledge = taskNode.pledge
taskNode.pledge = default(Pledge)
discard pledge.p.impl.chan.trySend(taskNode)
discard pledge.p.impl.deferredOut.fetchAdd(1, moRelaxed)
discard taskNode.pledge.p.impl.chan.trySend(taskNode)
discard taskNode.pledge.p.impl.deferredOut.fetchAdd(1, moRelaxed)
return true

proc delayedUntilIter(taskNode: TaskNode, curTask: Task): bool =
## Redelay a task that depends on multiple pledges
## with 1 or more pledge fulfilled but still some unfulfilled.
## field is a place holder for impl / impls[bucket]
##
## Returns `true` if the task has been delayed.
## The task should not be accessed anymore by the current worker.
## Returns `false` if the task can be scheduled right away by the current worker thread.
preCondition: not taskNode.pledge.p.isNil

if taskNode.pledge.p.impls[taskNode.bucketID].fulfilled.load(moRelaxed):
Expand All @@ -256,17 +259,19 @@ proc delayedUntilIter(taskNode: TaskNode, curTask: Task): bool =
return false

# Send the task to the pledge fulfiller
taskNode.task = curTask
let pledge = taskNode.pledge
taskNode.pledge = default(Pledge)
discard pledge.p.impls[taskNode.bucketID].chan.trySend(taskNode)
discard pledge.p.impls[taskNode.bucketID].deferredOut.fetchAdd(1, moRelaxed)
discard taskNode.pledge.p.impls[taskNode.bucketID].chan.trySend(taskNode)
discard taskNode.pledge.p.impls[taskNode.bucketID].deferredOut.fetchAdd(1, moRelaxed)
return true

proc delayedUntil*(taskNode: TaskNode, curTask: Task): bool =
## Redelay a task that depends on multiple pledges
## Redelay a task that depends on multiple pledges (in the `taskNode` linked list)
## with 1 or more pledge fulfilled but still some unfulfilled.
##
## Returns `true` if the task has been delayed.
## The task should not be accessed anymore by the current worker.
## Returns `false` if the task can be scheduled right away by the current worker thread.
preCondition: not taskNode.pledge.p.isNil
preCondition: bool(taskNode.task == curTask)
if taskNode.pledge.p.kind == Single:
delayedUntilSingle(taskNode, curTask)
else:
Expand All @@ -289,9 +294,10 @@ proc initialize*(pledge: var Pledge, pool: var TLPoolAllocator) =

proc delayedUntil*(task: Task, pledge: Pledge, pool: var TLPoolAllocator): bool =
## Defers a task until a pledge is fulfilled
## Returns true if the task has been delayed.
## The task should not be accessed anymore
## Returns false if the task can be scheduled right away.
##
## Returns `true` if the task has been delayed.
## The task should not be accessed anymore by the current worker.
## Returns `false` if the task can be scheduled right away by the current worker thread.
preCondition: not pledge.p.isNil
preCondition: pledge.p.kind == Single

Expand All @@ -312,6 +318,7 @@ proc delayedUntil*(task: Task, pledge: Pledge, pool: var TLPoolAllocator): bool
let taskNode = pool.borrow(deref(TaskNode))
taskNode.task = task
taskNode.next.store(nil, moRelaxed)
taskNode.nextDep = nil
taskNode.pledge = default(Pledge) # Don't need to store the pledge reference if there is only the current one
taskNode.bucketID = NoIter
discard pledge.p.impl.chan.trySend(taskNode)
Expand Down Expand Up @@ -341,6 +348,7 @@ template fulfillImpl*(pledge: Pledge, queue, enqueue: typed) =
var task: Task
var taskNode: TaskNode
while pledge.p.impl.chan.tryRecv(taskNode):
ascertain: not taskNode.isNil
ascertain: taskNode.bucketID == NoIter
task = taskNode.task
var wasDelayed = false
Expand Down Expand Up @@ -395,9 +403,10 @@ proc getBucket(pledge: Pledge, index: int32): int32 {.inline.} =

proc delayedUntil*(task: Task, pledge: Pledge, index: int32, pool: var TLPoolAllocator): bool =
## Defers a task until a pledge[index] is fulfilled
## Returns true if the task has been delayed.
## The task should not be accessed anymore
## Returns false if the task can be scheduled right away.
##
## Returns `true` if the task has been delayed.
## The task should not be accessed anymore by the current worker.
## Returns `false` if the task can be scheduled right away by the current worker thread.
preCondition: not pledge.p.isNil
preCondition: pledge.p.kind == Iteration

Expand All @@ -420,6 +429,7 @@ proc delayedUntil*(task: Task, pledge: Pledge, index: int32, pool: var TLPoolAll
let taskNode = pool.borrow(deref(TaskNode))
taskNode.task = task
taskNode.next.store(nil, moRelaxed)
taskNode.nextDep = nil
taskNode.pledge = default(Pledge) # Don't need to store the pledge reference if there is only the current one
taskNode.bucketID = bucket
discard pledge.p.impls[bucket].chan.trySend(taskNode)
Expand Down Expand Up @@ -451,6 +461,7 @@ template fulfillIterImpl*(pledge: Pledge, index: int32, queue, enqueue: typed) =
var task {.inject.}: Task
var taskNode: TaskNode
while pledge.p.impls[bucket].chan.tryRecv(taskNode):
ascertain: not taskNode.isNil
ascertain: taskNode.bucketID != NoIter
task = taskNode.task
var wasDelayed = false
Expand Down Expand Up @@ -503,6 +514,8 @@ macro delayedUntilMulti*(task: Task, pool: var TLPoolAllocator, pledges: varargs
taskNodeInit.add quote do:
`taskNode`.nextDep = `prevNode`
else:
taskNodeInit.add quote do:
`taskNode`.nextDep = nil
firstNode = taskNode
prevnode = taskNode
taskNodesInitStmt.add taskNodeInit
Expand Down Expand Up @@ -591,6 +604,8 @@ when isMainModule:
pledge2.fulfillImpl(stack, add)
doAssert stack.count == 3

echo "Simple pledge: SUCCESS"

mainSingle()

proc mainLoop() =
Expand Down Expand Up @@ -633,4 +648,6 @@ when isMainModule:
pledge2.fulfillIterImpl(4, stack, add)
doAssert stack.count == 3

echo "Loop pledge: SUCCESS"

mainLoop()
1 change: 1 addition & 0 deletions weave/parallel_tasks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -312,5 +312,6 @@ when isMainModule:
spawnDelayed pA, echoB1(pB1)
spawn echoA(pA)
exit(Weave)
echo "Weave runtime exited"

main()

0 comments on commit 8a0f744

Please sign in to comment.