From 8a0f7440f457ea83d8f98de647741dd4b0d77f1d Mon Sep 17 00:00:00 2001 From: Mamy Ratsimbazafy Date: Sat, 4 Apr 2020 19:48:17 +0200 Subject: [PATCH] Fix uninitialized pointer for tasks with multiple dependencies (#109) * Try to isolate the recurrent nil pledge bug on Travis MacOS * Remove some weird delaying logic * Try to sense if Travis bug is linked to https://github.com/mratsim/weave/issues/97 * more verbosity * nextDep wasn't properly zero-initialized * remove a repr * missed another "nil" * Cleanup investigative changes --- weave/channels/pledges.nim | 55 +++++++++++++++++++++++++------------- weave/parallel_tasks.nim | 1 + 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/weave/channels/pledges.nim b/weave/channels/pledges.nim index 4c2f3ad..dd622a2 100644 --- a/weave/channels/pledges.nim +++ b/weave/channels/pledges.nim @@ -214,7 +214,10 @@ proc `=`*(dst: var Pledge, src: Pledge) {.inline.} = proc delayedUntilSingle(taskNode: TaskNode, curTask: Task): bool = ## Redelay a task that depends on multiple pledges ## with 1 or more pledge fulfilled but still some unfulfilled. - ## field is a place holder for impl / impls[bucket] + ## + ## Returns `true` if the task has been delayed. + ## The task should not be accessed anymore by the current worker. + ## Returns `false` if the task can be scheduled right away by the current worker thread. preCondition: not taskNode.pledge.p.isNil if taskNode.pledge.p.impl.fulfilled.load(moRelaxed): @@ -230,17 +233,17 @@ proc delayedUntilSingle(taskNode: TaskNode, curTask: Task): bool = return false # Send the task to the pledge fulfiller - taskNode.task = curTask - let pledge = taskNode.pledge - taskNode.pledge = default(Pledge) - discard pledge.p.impl.chan.trySend(taskNode) - discard pledge.p.impl.deferredOut.fetchAdd(1, moRelaxed) + discard taskNode.pledge.p.impl.chan.trySend(taskNode) + discard taskNode.pledge.p.impl.deferredOut.fetchAdd(1, moRelaxed) return true proc delayedUntilIter(taskNode: TaskNode, curTask: Task): bool = ## Redelay a task that depends on multiple pledges ## with 1 or more pledge fulfilled but still some unfulfilled. - ## field is a place holder for impl / impls[bucket] + ## + ## Returns `true` if the task has been delayed. + ## The task should not be accessed anymore by the current worker. + ## Returns `false` if the task can be scheduled right away by the current worker thread. preCondition: not taskNode.pledge.p.isNil if taskNode.pledge.p.impls[taskNode.bucketID].fulfilled.load(moRelaxed): @@ -256,17 +259,19 @@ proc delayedUntilIter(taskNode: TaskNode, curTask: Task): bool = return false # Send the task to the pledge fulfiller - taskNode.task = curTask - let pledge = taskNode.pledge - taskNode.pledge = default(Pledge) - discard pledge.p.impls[taskNode.bucketID].chan.trySend(taskNode) - discard pledge.p.impls[taskNode.bucketID].deferredOut.fetchAdd(1, moRelaxed) + discard taskNode.pledge.p.impls[taskNode.bucketID].chan.trySend(taskNode) + discard taskNode.pledge.p.impls[taskNode.bucketID].deferredOut.fetchAdd(1, moRelaxed) return true proc delayedUntil*(taskNode: TaskNode, curTask: Task): bool = - ## Redelay a task that depends on multiple pledges + ## Redelay a task that depends on multiple pledges (in the `taskNode` linked list) ## with 1 or more pledge fulfilled but still some unfulfilled. + ## + ## Returns `true` if the task has been delayed. + ## The task should not be accessed anymore by the current worker. + ## Returns `false` if the task can be scheduled right away by the current worker thread. preCondition: not taskNode.pledge.p.isNil + preCondition: bool(taskNode.task == curTask) if taskNode.pledge.p.kind == Single: delayedUntilSingle(taskNode, curTask) else: @@ -289,9 +294,10 @@ proc initialize*(pledge: var Pledge, pool: var TLPoolAllocator) = proc delayedUntil*(task: Task, pledge: Pledge, pool: var TLPoolAllocator): bool = ## Defers a task until a pledge is fulfilled - ## Returns true if the task has been delayed. - ## The task should not be accessed anymore - ## Returns false if the task can be scheduled right away. + ## + ## Returns `true` if the task has been delayed. + ## The task should not be accessed anymore by the current worker. + ## Returns `false` if the task can be scheduled right away by the current worker thread. preCondition: not pledge.p.isNil preCondition: pledge.p.kind == Single @@ -312,6 +318,7 @@ proc delayedUntil*(task: Task, pledge: Pledge, pool: var TLPoolAllocator): bool let taskNode = pool.borrow(deref(TaskNode)) taskNode.task = task taskNode.next.store(nil, moRelaxed) + taskNode.nextDep = nil taskNode.pledge = default(Pledge) # Don't need to store the pledge reference if there is only the current one taskNode.bucketID = NoIter discard pledge.p.impl.chan.trySend(taskNode) @@ -341,6 +348,7 @@ template fulfillImpl*(pledge: Pledge, queue, enqueue: typed) = var task: Task var taskNode: TaskNode while pledge.p.impl.chan.tryRecv(taskNode): + ascertain: not taskNode.isNil ascertain: taskNode.bucketID == NoIter task = taskNode.task var wasDelayed = false @@ -395,9 +403,10 @@ proc getBucket(pledge: Pledge, index: int32): int32 {.inline.} = proc delayedUntil*(task: Task, pledge: Pledge, index: int32, pool: var TLPoolAllocator): bool = ## Defers a task until a pledge[index] is fulfilled - ## Returns true if the task has been delayed. - ## The task should not be accessed anymore - ## Returns false if the task can be scheduled right away. + ## + ## Returns `true` if the task has been delayed. + ## The task should not be accessed anymore by the current worker. + ## Returns `false` if the task can be scheduled right away by the current worker thread. preCondition: not pledge.p.isNil preCondition: pledge.p.kind == Iteration @@ -420,6 +429,7 @@ proc delayedUntil*(task: Task, pledge: Pledge, index: int32, pool: var TLPoolAll let taskNode = pool.borrow(deref(TaskNode)) taskNode.task = task taskNode.next.store(nil, moRelaxed) + taskNode.nextDep = nil taskNode.pledge = default(Pledge) # Don't need to store the pledge reference if there is only the current one taskNode.bucketID = bucket discard pledge.p.impls[bucket].chan.trySend(taskNode) @@ -451,6 +461,7 @@ template fulfillIterImpl*(pledge: Pledge, index: int32, queue, enqueue: typed) = var task {.inject.}: Task var taskNode: TaskNode while pledge.p.impls[bucket].chan.tryRecv(taskNode): + ascertain: not taskNode.isNil ascertain: taskNode.bucketID != NoIter task = taskNode.task var wasDelayed = false @@ -503,6 +514,8 @@ macro delayedUntilMulti*(task: Task, pool: var TLPoolAllocator, pledges: varargs taskNodeInit.add quote do: `taskNode`.nextDep = `prevNode` else: + taskNodeInit.add quote do: + `taskNode`.nextDep = nil firstNode = taskNode prevnode = taskNode taskNodesInitStmt.add taskNodeInit @@ -591,6 +604,8 @@ when isMainModule: pledge2.fulfillImpl(stack, add) doAssert stack.count == 3 + echo "Simple pledge: SUCCESS" + mainSingle() proc mainLoop() = @@ -633,4 +648,6 @@ when isMainModule: pledge2.fulfillIterImpl(4, stack, add) doAssert stack.count == 3 + echo "Loop pledge: SUCCESS" + mainLoop() diff --git a/weave/parallel_tasks.nim b/weave/parallel_tasks.nim index 51074b8..f673746 100644 --- a/weave/parallel_tasks.nim +++ b/weave/parallel_tasks.nim @@ -312,5 +312,6 @@ when isMainModule: spawnDelayed pA, echoB1(pB1) spawn echoA(pA) exit(Weave) + echo "Weave runtime exited" main()