From bd797eb9c1a10467778284178a8d606cbfca1045 Mon Sep 17 00:00:00 2001 From: Mamy Ratsimbazafy Date: Sun, 17 May 2020 20:27:51 +0200 Subject: [PATCH] Flow events (+ support destructible types like refcounting in tasks) (#144) * Rename Pladge to FlowEvent [skip ci] * update contexts * Finish renaming pledges --> flow events * zeroMem tasks: support destructors and refcounted types in Weave * Overhead of zero mem is too high (17%) log a TODO item * Have both spawnOnEvent and spawnOnEvents because grammar is beautiful * Update README and test variable names --- README.md | 87 ++-- .../gemm_pure_nim/gemm_packing_weave.nim | 4 +- .../gemm_pure_nim/gemm_weave.nim | 4 +- .../gemm_pure_nim/gemm_weave_nestable.nim | 6 +- changelog.md | 14 + tests/test_background_jobs.nim | 46 +- weave.nim | 8 +- weave.nimble | 4 +- weave/contexts.nim | 97 ++-- .../channels_mpsc_unbounded_batch.nim | 2 +- .../{pledges.nim => flow_events.nim} | 458 +++++++++--------- weave/datatypes/prell_deques.nim | 3 +- weave/executor.nim | 2 +- weave/memory/lookaside_lists.nim | 8 + weave/memory/memory_pools.nim | 16 +- weave/parallel_for.nim | 40 +- weave/parallel_jobs.nim | 93 ++-- weave/parallel_macros.nim | 24 +- weave/parallel_tasks.nim | 95 ++-- weave/state_machines/dispatch_events.nim | 2 +- 20 files changed, 550 insertions(+), 463 deletions(-) rename weave/cross_thread_com/{pledges.nim => flow_events.nim} (50%) diff --git a/README.md b/README.md index 5ca818b5..c9d38dbe 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ instead of being based on traditional work-stealing with shared-memory deques. > to be deadlock-free. They were not submitted to an additional data race > detection tool to ensure proper implementation. > -> Furthermore worker threads are basically actors or state-machines and +> Furthermore worker threads are state-machines and > were not formally verified either. > > Weave does limit synchronization to only simple SPSC and MPSC channels which greatly reduces @@ -170,7 +170,7 @@ The root thread is also a worker thread. Worker threads are tuned to maximize throughput of computational **tasks**. - `spawn fnCall(args)` which spawns a function that may run on another thread and gives you an awaitable `Flowvar` handle. -- `newPledge`, `fulfill` and `spawnDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships. +- `newFlowEvent`, `trigger`, `spawnOnEvent` and `spawnOnEvents` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships. - `sync(Flowvar)` will await a Flowvar and block until you receive a result. - `isReady(Flowvar)` will check if `sync` will actually block or return the result immediately. @@ -224,7 +224,7 @@ and for shutdown Once setup, a foreign thread can submit jobs via: - `submit fnCall(args)` which submits a function to the Weave runtime and gives you an awaitable `Pending` handle. -- `newPledge`, `fulfill` and `submitDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships. +- `newFlowEvent`, `trigger` and `submitDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships. - `waitFor(Pending)` which await a Pending job result and blocks the current thread - `isReady(Pending)` will check if `waitFor` will actually block or return the result immediately. - `isSubmitted(job)` allows you to build speculative algorithm where a job is submitted only if certain conditions are valid. @@ -473,31 +473,38 @@ In the literature, it is also called: Tagged experimental as the API and its implementation are unique compared to other libraries/language-extensions. Feedback welcome. -No specific ordering is required between calling the pledge producer and its consumer(s). +No specific ordering is required between calling the event producer and its consumer(s). -Dependencies are expressed by a handle called `Pledge`. -A pledge can express either a single dependency, initialized with `newPledge()` -or a dependencies on parallel for loop iterations, initialized with `newPledge(start, exclusiveStop, stride)` +Dependencies are expressed by a handle called `FlowEvent`. +An flow event can express either a single dependency, initialized with `newFlowEvent()` +or a dependencies on parallel for loop iterations, initialized with `newFlowEvent(start, exclusiveStop, stride)` -To await on a single pledge `singlePledge` pass it to `spawnDelayed` or the `parallelFor` invocation. -To await on an iteration `iterPledge`, pass a tuple: -- `(iterPledge, 0)` to await precisely and only for iteration 0. This works with both `spawnDelayed` or `parallelFor` -- `(iterPledge, myIndex)` to await on a whole iteration range. This only works with `parallelFor`. The `Pledge` iteration domain and the `parallelFor` domain must be the same. As soon as a subset of the pledge is ready, the corresponding `parallelFor` tasks will be scheduled. +To await on a single event pass it to `spawnOnEvent` or the `parallelFor` invocation. +To await on an iteration, pass a tuple: +- `(FlowEvent, 0)` to await precisely and only for iteration 0. This works with both `spawnOnEvent` or `parallelFor` (via a dependsOnEvent statement) +- `(FlowEvent, loop_index_variable)` to await on a whole iteration range. + For example + ```Nim + parallelFor i in 0 ..< n: + dependsOnEvent: (e, i) # Each "i" will independently depends on their matching event + body + ``` + This only works with `parallelFor`. The `FlowEvent` iteration domain and the `parallelFor` domain must be the same. As soon as a subset of the pledge is ready, the corresponding `parallelFor` tasks will be scheduled. #### Delayed computation with single dependencies ```Nim import weave -proc echoA(pA: Pledge) = +proc echoA(eA: FlowEvent) = echo "Display A, sleep 1s, create parallel streams 1 and 2" sleep(1000) - pA.fulfill() + eA.trigger() -proc echoB1(pB1: Pledge) = +proc echoB1(eB1: FlowEvent) = echo "Display B1, sleep 1s" sleep(1000) - pB1.fulfill() + eB1.trigger() proc echoB2() = echo "Display B2, exit stream" @@ -508,12 +515,12 @@ proc echoC1() = proc main() = echo "Dataflow parallelism with single dependency" init(Weave) - let pA = newPledge() - let pB1 = newPledge() - spawnDelayed pB1, echoC1() - spawnDelayed pA, echoB2() - spawnDelayed pA, echoB1(pB1) - spawn echoA(pA) + let eA = newFlowEvent() + let eB1 = newFlowEvent() + spawnOnEvent eB1, echoC1() + spawnOnEvent eA, echoB2() + spawnOnEvent eA, echoB1(eB1) + spawn echoA(eA) exit(Weave) main() @@ -524,19 +531,19 @@ main() ```Nim import weave -proc echoA(pA: Pledge) = +proc echoA(eA: FlowEvent) = echo "Display A, sleep 1s, create parallel streams 1 and 2" sleep(1000) - pA.fulfill() + eA.trigger() -proc echoB1(pB1: Pledge) = +proc echoB1(eB1: FlowEvent) = echo "Display B1, sleep 1s" sleep(1000) - pB1.fulfill() + eB1.trigger() -proc echoB2(pB2: Pledge) = +proc echoB2(eB2: FlowEvent) = echo "Display B2, no sleep" - pB2.fulfill() + eB2.trigger() proc echoC12() = echo "Display C12, exit stream" @@ -544,13 +551,13 @@ proc echoC12() = proc main() = echo "Dataflow parallelism with multiple dependencies" init(Weave) - let pA = newPledge() - let pB1 = newPledge() - let pB2 = newPledge() - spawnDelayed pB1, pB2, echoC12() - spawnDelayed pA, echoB2(pB2) - spawnDelayed pA, echoB1(pB1) - spawn echoA(pA) + let eA = newFlowEvent() + let eB1 = newFlowEvent() + let eB2 = newFlowEvent() + spawnOnEvents eB1, eB2, echoC12() + spawnOnEvent eA, echoB2(eB2) + spawnOnEvent eA, echoB1(eB1) + spawn echoA(eA) exit(Weave) main() @@ -570,20 +577,20 @@ import weave proc main() = init(Weave) - let pA = newPledge(0, 10, 1) - let pB = newPledge(0, 10, 1) + let eA = newFlowEvent(0, 10, 1) + let pB = newFlowEvent(0, 10, 1) parallelFor i in 0 ..< 10: - captures: {pA} + captures: {eA} sleep(i * 10) - pA.fulfill(i) + eA.trigger(i) echo "Step A - stream ", i, " at ", i * 10, " ms" parallelFor i in 0 ..< 10: - dependsOn: (pA, i) + dependsOn: (eA, i) captures: {pB} sleep(i * 10) - pB.fulfill(i) + pB.trigger(i) echo "Step B - stream ", i, " at ", 2 * i * 10, " ms" parallelFor i in 0 ..< 10: diff --git a/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_packing_weave.nim b/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_packing_weave.nim index 555d045f..4d86cc83 100644 --- a/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_packing_weave.nim +++ b/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_packing_weave.nim @@ -62,7 +62,7 @@ proc pack_A_mc_kc*[T; ukernel: static MicroKernel]( proc pack_B_kc_nc*[T; ukernel: static MicroKernel]( packedB: ptr UncheckedArray[T], kc, nc: int, - B: MatrixView[T], kcTileReady: Pledge) = + B: MatrixView[T], kcTileReady: FlowEvent) = ## Packs panel [kc, nc] for ~B (half-L1 cache) ## Pads if needed ## @@ -97,4 +97,4 @@ proc pack_B_kc_nc*[T; ukernel: static MicroKernel]( # so waiting there guarantees proper data dependencies # provided the "k" loop is not nested (i.e. does real work instead of enqueueing tasks) discard sync(kcLoop) - kcTileReady.fulfill() + kcTileReady.trigger() diff --git a/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim b/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim index b6172f6c..554a8fb6 100644 --- a/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim +++ b/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim @@ -155,7 +155,7 @@ proc gemm_impl[T; ukernel: static MicroKernel]( prefetch(tiles.b, Write, LowTemporalLocality) let kc = min(K - pc, tiles.kc) # Deal with edges # A[0:M, pc:pc+kc] - let kcncTileReady = newPledge() + let kcncTileReady = newFlowEvent() let kcncB = vB.stride(pc, 0) # B[pc:pc+kc, jc:jc+nc] spawn pack_B_kc_nc[T, ukernel]( # PackB panel [kc, nc] (nc is large or unknown) tiles.b, kc, nc, kcncB, kcncTileReady) @@ -176,7 +176,7 @@ proc gemm_impl[T; ukernel: static MicroKernel]( let mckcA = vA.stride(ic, pc) # A[ic:ic+mc, pc:pc+kc] pack_A_mc_kc[T, ukernel](packA, mc, kc, mckcA) # PackA block [mc, kc] - spawnDelayed( + spawnOnEvent( kcncTileReady, gebp_mkernel[T, ukernel]( # GEBP macrokernel: mc, nc, kc, # C[ic:ic+mc, jc:jc+nc] = diff --git a/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave_nestable.nim b/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave_nestable.nim index 08ea4a92..8e06fc93 100644 --- a/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave_nestable.nim +++ b/benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave_nestable.nim @@ -153,7 +153,7 @@ proc gemm_impl[T; ukernel: static MicroKernel]( for pc in countup(0, K-1, tiles.kc): # Explanation of the control flow: # 1. syncScope -> only trigger the next `pc` iteration once the current one is finished - # 2. kcncTileReady -> packing B is asynchronously spawned, kcNcTileReady is a pledge, once fulfilled + # 2. kcncTileReady -> packing B is asynchronously spawned, kcNcTileReady is a FlowEvent, once triggered # computation dependent on packing B can proceed # 3. parallelFor icb -> is an async parallel for loop that may or may not be split in multiple thread # 4. spawnDelayed -> the packing of A is done synchronously in the current thread @@ -162,7 +162,7 @@ proc gemm_impl[T; ukernel: static MicroKernel]( prefetch(tiles.b, Write, LowTemporalLocality) let kc = min(K - pc, tiles.kc) # Deal with edges # A[0:M, pc:pc+kc] - let kcncTileReady = newPledge() + let kcncTileReady = newFlowEvent() let kcncB = vB.stride(pc, 0) # B[pc:pc+kc, jc:jc+nc] spawn pack_B_kc_nc[T, ukernel]( # PackB panel [kc, nc] (nc is large or unknown) tiles.b, kc, nc, kcncB, kcncTileReady) @@ -183,7 +183,7 @@ proc gemm_impl[T; ukernel: static MicroKernel]( let mckcA = vA.stride(ic, pc) # A[ic:ic+mc, pc:pc+kc] pack_A_mc_kc[T, ukernel](packA, mc, kc, mckcA) # PackA block [mc, kc] - spawnDelayed( + spawnOnEvent( kcncTileReady, gebp_mkernel[T, ukernel]( # GEBP macrokernel: mc, nc, kc, # C[ic:ic+mc, jc:jc+nc] = diff --git a/changelog.md b/changelog.md index 5af8d9eb..ba5c387d 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,20 @@ ### v0.x.x - Unreleased +#### Breaking + +- The experimental `Pledge` for dataflow parallelism have been renamed + `FlowEvent` to be in line with: + - `AsyncEvent` in Nim async frameworks + - `cudaEvent_t` in CUDA + - `cl_event` in OpenCL + + Renaming changes: + - `newPledge()` becomes `newFlowEvent()` + - `fulfill()` becomes `trigger()` + - `spawnDelayed()` becomes `spawnOnEvents()` + - The `dependsOn` clause in `parallelFor` becomes `dependsOnEvent` + #### Features - Added `isReady(Flowvar)` which will return true is `sync` would block on that Flowvar or if the result is actually immediately available. diff --git a/tests/test_background_jobs.nim b/tests/test_background_jobs.nim index 3a319d0c..5ef4764b 100644 --- a/tests/test_background_jobs.nim +++ b/tests/test_background_jobs.nim @@ -71,15 +71,15 @@ proc main() = block: # Delayed computation serviceDone.store(false, moRelaxed) - proc echoA(pA: Pledge) = + proc echoA(eA: FlowEvent) = echo "Display A, sleep 1s, create parallel streams 1 and 2" sleep(1000) - pA.fulfill() + eA.trigger() - proc echoB1(pB1: Pledge) = + proc echoB1(eB1: FlowEvent) = echo "Display B1, sleep 1s" sleep(1000) - pB1.fulfill() + eB1.trigger() proc echoB2() = echo "Display B2, exit stream" @@ -92,12 +92,12 @@ proc main() = waitUntilReady(Weave) echo "Sanity check 3: Dataflow parallelism" - let pA = newPledge() - let pB1 = newPledge() - let done = submitDelayed(pB1, echoC1()) - submitDelayed pA, echoB2() - submitDelayed pA, echoB1(pB1) - submit echoA(pA) + let eA = newFlowEvent() + let eB1 = newFlowEvent() + let done = submitOnEvent(eB1, echoC1()) + submitOnEvent eA, echoB2() + submitOnEvent eA, echoB1(eB1) + submit echoA(eA) discard waitFor(done) serviceDone[].store(true, moRelaxed) @@ -109,19 +109,19 @@ proc main() = block: # Delayed computation with multiple dependencies serviceDone.store(false, moRelaxed) - proc echoA(pA: Pledge) = + proc echoA(eA: FlowEvent) = echo "Display A, sleep 1s, create parallel streams 1 and 2" sleep(1000) - pA.fulfill() + eA.trigger() - proc echoB1(pB1: Pledge) = + proc echoB1(eB1: FlowEvent) = echo "Display B1, sleep 1s" sleep(1000) - pB1.fulfill() + eB1.trigger() - proc echoB2(pB2: Pledge) = + proc echoB2(eB2: FlowEvent) = echo "Display B2, no sleep" - pB2.fulfill() + eB2.trigger() proc echoC12(): bool = echo "Display C12, exit stream" @@ -132,13 +132,13 @@ proc main() = waitUntilReady(Weave) echo "Sanity check 4: Dataflow parallelism with multiple dependencies" - let pA = newPledge() - let pB1 = newPledge() - let pB2 = newPledge() - let done = submitDelayed(pB1, pB2, echoC12()) - submitDelayed pA, echoB2(pB2) - submitDelayed pA, echoB1(pB1) - submit echoA(pA) + let eA = newFlowEvent() + let eB1 = newFlowEvent() + let eB2 = newFlowEvent() + let done = submitOnEvents(eB1, eB2, echoC12()) + submitOnEvent eA, echoB2(eB2) + submitOnEvent eA, echoB1(eB1) + submit echoA(eA) discard waitFor(done) serviceDone[].store(true, moRelaxed) diff --git a/weave.nim b/weave.nim index f8c45389..a1fc1b89 100644 --- a/weave.nim +++ b/weave.nim @@ -11,7 +11,7 @@ import runtime], weave/state_machines/[sync_root, sync, sync_scope], weave/datatypes/flowvars, - weave/cross_thread_com/pledges, + weave/cross_thread_com/flow_events, weave/contexts, weave/[executor, parallel_jobs] @@ -28,11 +28,11 @@ export # Experimental scope barrier syncScope, # Experimental dataflow parallelism - spawnDelayed, Pledge, - fulfill, newPledge, + spawnOnEvent, spawnOnEvents, FlowEvent, + trigger, newFlowEvent, # Experimental background service Pending, - submit, submitDelayed, + submit, submitOnEvent, submitOnEvents, runInBackground, waitUntilReady, setupSubmitterThread, teardownSubmitterThread, waitFor, isSubmitted, diff --git a/weave.nimble b/weave.nimble index 2a242f68..1d9b1e27 100644 --- a/weave.nimble +++ b/weave.nimble @@ -30,7 +30,7 @@ task test, "Run Weave tests": test "", "weave/cross_thread_com/channels_spsc_single.nim" test "", "weave/cross_thread_com/channels_spsc_single_ptr.nim" test "", "weave/cross_thread_com/channels_mpsc_unbounded_batch.nim" - test "", "weave/cross_thread_com/pledges.nim" + test "", "weave/cross_thread_com/flow_events.nim" test "", "weave/datatypes/binary_worker_trees.nim" test "", "weave/datatypes/bounded_queues.nim" @@ -96,7 +96,7 @@ task test_gc_arc, "Run Weave tests with --gc:arc": test "--gc:arc", "weave/cross_thread_com/channels_spsc_single.nim" test "--gc:arc", "weave/cross_thread_com/channels_spsc_single_ptr.nim" test "--gc:arc", "weave/cross_thread_com/channels_mpsc_unbounded_batch.nim" - test "--gc:arc", "weave/cross_thread_com/pledges.nim" + test "--gc:arc", "weave/cross_thread_com/flow_events.nim" test "--gc:arc", "weave/datatypes/binary_worker_trees.nim" test "--gc:arc", "weave/datatypes/bounded_queues.nim" diff --git a/weave/contexts.nim b/weave/contexts.nim index b41a1a98..f478bab9 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -7,7 +7,7 @@ import ./datatypes/[context_global, context_thread_local, sync_types, prell_deques, binary_worker_trees], - ./cross_thread_com/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, scoped_barriers, pledges], + ./cross_thread_com/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, scoped_barriers, flow_events], ./memory/[persistacks, lookaside_lists, memory_pools, allocs], ./config, ./instrumentation/[profilers, loggers, contracts] @@ -111,23 +111,30 @@ Backoff: # ---------------------------------------------------------------------------------- proc newTaskFromCache*(): Task = - result = workerContext.taskCache.pop() + result = workerContext.taskCache.pop0() if result.isNil: - result = myMemPool().borrow(deref(Task)) - # Zeroing is expensive, it's 96 bytes - - # result.fn = nil # Always overwritten - # result.parent = nil # Always overwritten - # result.scopedBarrier = nil # Always overwritten - result.prev = nil - result.next = nil - result.start = 0 - result.cur = 0 - result.stop = 0 - result.stride = 0 - result.futures = nil - result.isLoop = false - result.hasFuture = false + result = myMemPool().borrow0(deref(Task)) + # The task must be fully zero-ed including the data buffer + # otherwise datatypes that use custom destructors + # and that rely on "myPointer.isNil" to return early + # may read recycled garbage data. + # "FlowEvent" is such an example + + # TODO: The perf cost to the following is 17% as measured on fib(40) + + # # Zeroing is expensive, it's 96 bytes + # # result.fn = nil # Always overwritten + # # result.parent = nil # Always overwritten + # # result.scopedBarrier = nil # Always overwritten + # result.prev = nil + # result.next = nil + # result.start = 0 + # result.cur = 0 + # result.stop = 0 + # result.stride = 0 + # result.futures = nil + # result.isLoop = false + # result.hasFuture = false proc delete*(task: Task) {.inline.} = preCondition: not task.isNil() @@ -145,35 +152,45 @@ proc flushAndDispose*(dq: var PrellDeque) = for task in items(leftovers): recycle(task) -# Pledges - Dataflow parallelism +# FlowEvent - Dataflow parallelism # ---------------------------------------------------------------------------------- -proc newPledge*(): Pledge = - ## Creates a pledge - ## Tasks associated with a pledge are only scheduled when the pledge is fulfilled. - ## A pledge can only be fulfilled once. - ## Pledges enable modeling precise producer-consumer data dependencies. +proc newFlowEvent*(): FlowEvent = + ## Creates a FlowEvent + ## Tasks associated with an event are only scheduled when the event is triggered. + ## An event can only be triggered once. + ## + ## FlowEvent enable modeling precise producer-consumer data dependencies + ## to implement dataflow parallelism and task graphs. result.initialize(myMemPool()) -proc newPledge*(start, stop, stride: SomeInteger): Pledge = - ## Creates a loop iteration pledge. - ## With a loop iteration pledge, tasks can be associated with a precise loop index. +proc newFlowEvent*(start, stop, stride: SomeInteger): FlowEvent = + ## Creates a loop iteration FlowEvent. + ## + ## With a loop iteration event, tasks can be associated with a precise loop index + ## or loop range. + ## It is strongly recommended to use loop tiling (also called loop blocking) + ## in combination with loop FlowEvent to process them in bulk and reduce overhead. + ## For example you can split a loop of size 1024 into tiles/blocks of 128 items + ## and associate only 8 events to the range instead of 1024. + ## + ## Tasks associated with an event are only scheduled when the event is triggered. + ## An event can only be triggered once. ## - ## Tasks associated with a pledge are only scheduled when the pledge is fulfilled. - ## A pledge can only be fulfilled once. - ## Pledges enable modeling precise producer-consumer data dependencies. + ## FlowEvent enable modeling precise producer-consumer data dependencies + ## to implement dataflow parallelism and task graphs. result.initialize(myMemPool(), start.int32, stop.int32, stride.int32) -proc fulfill*(pledge: Pledge) = - ## Fulfills a pledge - ## All ready tasks that depended on that pledge will be scheduled immediately. - ## A ready task is a task that has all its pledged dependencies fulfilled. - fulfillImpl(pledge, myWorker().deque, addFirst) - -proc fulfill*(pledge: Pledge, index: SomeInteger) = - ## Fulfills an iteration pledge - ## All ready tasks that depended on that pledge will be scheduled immediately. - ## A ready task is a task that has all its pledged dependencies fulfilled. - fulfillIterImpl(pledge, int32(index), myWorker().deque, addFirst) +proc trigger*(event: FlowEvent) = + ## Triggers an event + ## All ready tasks that depended on that event will be scheduled immediately. + ## A ready task is a task that has all its event dependencies triggered. + triggerImpl(event, myWorker().deque, addFirst) + +proc trigger*(event: FlowEvent, index: SomeInteger) = + ## Triggers an iteration event + ## All ready tasks that depended on that event will be scheduled immediately. + ## A ready task is a task that has all its event dependencies triggered. + triggerIterImpl(event, int32(index), myWorker().deque, addFirst) # Dynamic Scopes # ---------------------------------------------------------------------------------- diff --git a/weave/cross_thread_com/channels_mpsc_unbounded_batch.nim b/weave/cross_thread_com/channels_mpsc_unbounded_batch.nim index 50ea3dcb..05223e83 100644 --- a/weave/cross_thread_com/channels_mpsc_unbounded_batch.nim +++ b/weave/cross_thread_com/channels_mpsc_unbounded_batch.nim @@ -56,7 +56,7 @@ type front{.align: MpscPadding.}: typeof(default(T)[]) # back and front order is chosen so that the data structure can be # made intrusive to consumer data-structures - # like the memory-pool and the pledges so that + # like the memory-pool and the events so that # producer accesses don't invalidate consumer cache-lines # # The padding is a compromise to keep back and front 1 cache line apart diff --git a/weave/cross_thread_com/pledges.nim b/weave/cross_thread_com/flow_events.nim similarity index 50% rename from weave/cross_thread_com/pledges.nim rename to weave/cross_thread_com/flow_events.nim index 440bdedd..ee018250 100644 --- a/weave/cross_thread_com/pledges.nim +++ b/weave/cross_thread_com/flow_events.nim @@ -12,14 +12,14 @@ import ./channels_mpsc_unbounded_batch, ../datatypes/sync_types, ../memory/[allocs, memory_pools], - ../instrumentation/contracts, + ../instrumentation/[contracts, loggers], ../config -# Pledges +# Flow Events # ---------------------------------------------------- -# Pledges are the counterpart to Flowvar. +# FlowEvents are the counterpart to Flowvars. # -# When a task depends on a pledge, it is delayed until the pledge is fulfilled +# When a task depends on an event, it is delayed until the event is triggered # This allows to model precise dependencies between tasks # beyond what traditional control-flow dependencies (function calls, barriers, locks) allow. # @@ -29,7 +29,7 @@ import # - expose the programmers to concurrency woes that could be avoided # by specifying precede/after relationship # -# This data availabity based parallelism is also called: +# This data-availability-based parallelism is also called: # - dataflow parallelism # - graph parallelism # - data-driven task parallelism @@ -42,52 +42,52 @@ import # Protocol (https://github.com/mratsim/weave/pull/92#issuecomment-570795718) # ---------------------------------------------------- # -# A pledge is an ownerless MPSC channel that holds tasks. +# A flow event is an ownerless MPSC channel that holds tasks. # The number of tasks in the channel is bounded by the number of dependent tasks -# When a worker fulfills a pledge, it becomes the unique consumer of the MPSC channel. -# It flags the pledge as fullfilled and drain the channel of all tasks. -# When a task is dependent on a pledge, the worker that received the dependent task -# checks the fulfilled flag. -# Case 1: It is fulfilled, it schedules the task as a normal task -# Case 2: It is not fulfilled, it sends the task in the pledge MPSC channel. +# When a worker triggers an event, it becomes the unique consumer of the MPSC channel. +# It flags the event as triggered and drain the channel of all tasks. +# When a task is dependent on an event, the worker that received the dependent task +# checks the triggered flag. +# Case 1: It is already triggered, it schedules the task as a normal task +# Case 2: It is not triggered, it sends the task in the flow event MPSC channel. # -# Tasks with multiple dependencies are represented by a list of pledges -# When a task is enqueued, it is sent to one of the unfulfilled pledge channel at random -# When that pledge is fulfilled, if all other pladges are fulfiled, it can be scheduled immediately -# Otherwise it is sent to one of the unfilfilled pledge at random. +# Tasks with multiple dependencies are represented by a linked list of events +# When a task is enqueued, it is sent to one of the untriggered event channels at random +# When that event is triggered, if all other events were also triggered, it can be scheduled immediately +# Otherwise it is sent to one of the untriggered events at random. # # Memory management is done through atomic reference counting. -# Pledges for loop iterations can use a single reference count for all iterations, -# however each iteration should have its pledge channel. This has a memory cost, -# users should be encouraged to use tiling/blocking. +# Events for loop iterations can use a single reference count for all iterations, +# however each iteration should have its event channel. This has a memory cost, +# users should be encouraged to use loop tiling/blocking. # # Mutual exclusion -# There is a race if a producer worker delivers on the pledge and a consumer -# checks the pledge status. -# In our case, the fulfilled flag is write-once, the producer worker only requires +# There is a race if a producer worker triggers an event and a consumer +# checks the event status. +# In our case, the triggered flag is write-once, the producer worker only requires # a way to know if a data race could have occurred. # -# The pledge keeps 2 monotonically increasing atomic count of consumers in and consumers out -# When a consumer checks the pledge: +# The event keeps 2 monotonically increasing atomic count of consumers in and consumers out +# When a consumer checks the event: # - it increments the consumer count "in" # - on exit it always increments the consumer count "out" -# - if it's fulfilled, increments the consumer count "out", +# - if it's triggered, increments the consumer count "out", # then exits and schedule the task itself # - if it's not, it enqueues the task # Then increments the count out # - The producer thread checks after draining all tasks that the consumer in/out counts are the same # otherwise it needs to drain again until it is sure that the consumer is out. # Keeping 2 count avoids the ABA problem. -# Pledges are allocated from memory pool blocks of size 2x WV_CacheLinePadding (256 bytes) +# Events are allocated from memory pool blocks of size 2x WV_CacheLinePadding (256 bytes) # with an intrusive MPSC channel # # Analysis: # This protocol avoids latency between when the data is ready and when the task is scheduled # exposing the maximum amount of parallelism. -# Proof: As soon as the pledge is fulfilled, any dependent tasks are scheduled -# or the task was not yet created. In tasks created late is scheduled by the creating worker. +# Proof: As soon as the event is triggered, any dependent tasks are scheduled +# or the task was not yet created. Tasks created late are scheduled immediately by the creating worker. # -# This protocol minimizes the number of message sent. There is at most 1 per dependencies unlike +# This protocol minimizes the number of messages sent. There is at most 1 per dependency unlike # a gossipsub, floodsub or episub approach which sends an exponential number of messages # and are sensitive to relayers' delays. # @@ -98,83 +98,73 @@ import # and also might require them to scan possibly long sequences of dependencies. # # This protocol avoids the need of multiple hash-tables or a graph-library -# to map Pledge=>seq[Task] and Task=>seq[Pledge] to quickly obtain -# all tasks that can be scheduled from a resolved pledge and -# to track the multiple dependencies a task can have. +# to map FlowEvent=>seq[Task] and Task=>seq[FlowEvent] to quickly obtain +# all tasks that can be scheduled from a triggered event and +# to track the multiple event dependencies a task can have. # # In particular this play well with the custom memory pool of Weave, unlike Nim sequences or hash-tables. # -# This protocol is cache friendly. The deferred task is co-located with the producer pledge. +# This protocol is cache friendly. The deferred task is co-located with the producer event. # When scheduled, it will use data hot in cache unless task is stolen but it can only be stolen if it's the # only task left due to LIFO work and FIFO thefts. # -# This protocol doesn't impose ordering on the producer and consumer (pledge fulfiller and pledge dependent task). +# This protocol doesn't impose ordering on the producer and consumer (event triggerer and event dependent task). # Other approaches might lead to missed messages unless they introduce state/memory, # which is always complex in "distributed" long-lived computations due to memory reclamation (hazard pointers, epoch-based reclamation, ...) type - Pledge* = object - ## A pledge represents a contract between - ## a producer task that fulfills or deliver on the pledge - ## and a consumer dependent task that is deferred until the pledge is fulfilled. + FlowEvent* = object + ## A flow event represents a contract between + ## a producer task that triggers or fires the event + ## and a consumer dependent task that is deferred until the event is triggered. ## - ## The difference with a Flowvar is that a Pledge represents + ## The difference with a Flowvar is that a FlowEvent represents ## a delayed input while a Flowvar represents a delayed result. ## - ## Pledge enables the following parallelism paradigm known under the following names: + ## FlowEvents enable the following parallelism paradigm known under the following names: ## - dataflow parallelism ## - graph parallelism ## - pipeline parallelism ## - data-driven task parallelism ## - stream parallelism - ## - ## In particular, this is the only way to implement a "barrier" compatible - ## with a work-stealing scheduler that can be composed and nested in parallel regions - ## that an unknown number of workers will execute. - p: PledgePtr + e: EventPtr TaskNode = ptr object ## Task Metadata. task*: Task - # Next task in the current pledge channel + # Next task in the current event channel next: Atomic[pointer] # Next task dependency if it has multiple nextDep*: TaskNode - pledge*: Pledge + event*: FlowEvent bucketID*: int32 - PledgeKind = enum + EventKind = enum Single Iteration - PledgeIter = object + EventIter = object numBuckets: int32 start, stop, stride: int32 - impls: ptr UncheckedArray[PledgeImpl] + singles: ptr UncheckedArray[SingleEvent] - PledgeUnion {.union.} = object - impl: PledgeImpl - iter: PledgeIter + EventUnion {.union.} = object + single: SingleEvent + iter: EventIter - PledgePtr = ptr object + EventPtr = ptr object refCount: Atomic[int32] - kind: PledgeKind - union: PledgeUnion - - PledgeImpl = object - # Issue: https://github.com/mratsim/weave/issues/93 - # TODO, the current MPSC channel always use a "count" field. - # Contrary to StealRequest and the remote freed memory in the memory pool, - # this is not needed, and atomics are expensive. - # It can be made optional with a useCount static bool. + kind: EventKind + union: EventUnion - # The MPSC Channel is intrusive to the PledgeImpl. + SingleEvent = object + # The MPSC Channel is intrusive to the SingleEvent. # The end fields in the channel should be the consumer - # to avoid cache-line conflicts with producer threads. + # to limit cache-line conflicts with producer threads. chan: ChannelMpscUnboundedBatch[TaskNode, keepCount = false] deferredIn: Atomic[int32] deferredOut: Atomic[int32] - fulfilled: Atomic[bool] + triggered: Atomic[bool] const NoIter* = -1 @@ -183,174 +173,170 @@ const NoIter* = -1 # Refcounting is started from 0 and we avoid fetchSub with release semantics # in the common case of only one reference being live. -proc `=destroy`*(pledge: var Pledge) = - if pledge.p.isNil: +proc `=destroy`*(event: var FlowEvent) = + if event.e.isNil: return - let count = pledge.p.refCount.load(moRelaxed) + let count = event.e.refCount.load(moRelaxed) fence(moAcquire) if count == 0: # We have the last reference - if not pledge.p.isNil: - if pledge.p.kind == Iteration: - wv_free(pledge.p.union.iter.impls) + if not event.e.isNil: + if event.e.kind == Iteration: + wv_free(event.e.union.iter.singles) # Return memory to the memory pool - recycle(pledge.p) + recycle(event.e) else: - discard fetchSub(pledge.p.refCount, 1, moRelease) - pledge.p = nil + discard fetchSub(event.e.refCount, 1, moRelease) + event.e = nil -proc `=sink`*(dst: var Pledge, src: Pledge) {.inline.} = +proc `=sink`*(dst: var FlowEvent, src: FlowEvent) {.inline.} = # Don't pay for atomic refcounting when compiler can prove there is no ref change - # `=destroy`(dst) # it seems like we can have non properly init types? - # with the pointer not being nil, but invalid as well - system.`=sink`(dst.p, src.p) + `=destroy`(dst) + system.`=sink`(dst.e, src.e) -proc `=`*(dst: var Pledge, src: Pledge) {.inline.} = - discard fetchAdd(src.p.refCount, 1, moRelaxed) - dst.p = src.p +proc `=`*(dst: var FlowEvent, src: FlowEvent) {.inline.} = + `=destroy`(dst) + discard fetchAdd(src.e.refCount, 1, moRelaxed) + dst.e = src.e -# Multi-Dependencies pledges +# Multi-Dependencies FlowEvents # ---------------------------------------------------- proc delayedUntilSingle(taskNode: TaskNode, curTask: Task): bool = - ## Redelay a task that depends on multiple pledges - ## with 1 or more pledge fulfilled but still some unfulfilled. + ## Redelay a task that depends on multiple events + ## with 1 or more events triggered but still some triggered. ## ## Returns `true` if the task has been delayed. ## The task should not be accessed anymore by the current worker. ## Returns `false` if the task can be scheduled right away by the current worker thread. - preCondition: not taskNode.pledge.p.isNil - preCondition: taskNode.pledge.p.kind == Single + preCondition: not taskNode.event.e.isNil + preCondition: taskNode.event.e.kind == Single - if taskNode.pledge.p.union.impl.fulfilled.load(moRelaxed): + if taskNode.event.e.union.single.triggered.load(moRelaxed): fence(moAcquire) return false # Mutual exclusion / prevent races - discard taskNode.pledge.p.union.impl.deferredIn.fetchAdd(1, moRelaxed) + discard taskNode.event.e.union.single.deferredIn.fetchAdd(1, moRelaxed) - if taskNode.pledge.p.union.impl.fulfilled.load(moRelaxed): + if taskNode.event.e.union.single.triggered.load(moRelaxed): fence(moAcquire) - discard taskNode.pledge.p.union.impl.deferredOut.fetchAdd(1, moRelaxed) + discard taskNode.event.e.union.single.deferredOut.fetchAdd(1, moRelaxed) return false - # Send the task to the pledge fulfiller - discard taskNode.pledge.p.union.impl.chan.trySend(taskNode) - discard taskNode.pledge.p.union.impl.deferredOut.fetchAdd(1, moRelaxed) + # Send the task to the event triggerer + discard taskNode.event.e.union.single.chan.trySend(taskNode) + discard taskNode.event.e.union.single.deferredOut.fetchAdd(1, moRelaxed) return true proc delayedUntilIter(taskNode: TaskNode, curTask: Task): bool = - ## Redelay a task that depends on multiple pledges - ## with 1 or more pledge fulfilled but still some unfulfilled. + ## Redelay a task that depends on multiple events + ## with 1 or more events triggered but still some untriggered. ## ## Returns `true` if the task has been delayed. ## The task should not be accessed anymore by the current worker. ## Returns `false` if the task can be scheduled right away by the current worker thread. - preCondition: not taskNode.pledge.p.isNil + preCondition: not taskNode.event.e.isNil - if taskNode.pledge.p.union.iter.impls[taskNode.bucketID].fulfilled.load(moRelaxed): + if taskNode.event.e.union.iter.singles[taskNode.bucketID].triggered.load(moRelaxed): fence(moAcquire) return false # Mutual exclusion / prevent races - discard taskNode.pledge.p.union.iter.impls[taskNode.bucketID].deferredIn.fetchAdd(1, moRelaxed) + discard taskNode.event.e.union.iter.singles[taskNode.bucketID].deferredIn.fetchAdd(1, moRelaxed) - if taskNode.pledge.p.union.iter.impls[taskNode.bucketID].fulfilled.load(moRelaxed): + if taskNode.event.e.union.iter.singles[taskNode.bucketID].triggered.load(moRelaxed): fence(moAcquire) - discard taskNode.pledge.p.union.iter.impls[taskNode.bucketID].deferredOut.fetchAdd(1, moRelaxed) + discard taskNode.event.e.union.iter.singles[taskNode.bucketID].deferredOut.fetchAdd(1, moRelaxed) return false - # Send the task to the pledge fulfiller - discard taskNode.pledge.p.union.iter.impls[taskNode.bucketID].chan.trySend(taskNode) - discard taskNode.pledge.p.union.iter.impls[taskNode.bucketID].deferredOut.fetchAdd(1, moRelaxed) + # Send the task to the event triggerer + discard taskNode.event.e.union.iter.singles[taskNode.bucketID].chan.trySend(taskNode) + discard taskNode.event.e.union.iter.singles[taskNode.bucketID].deferredOut.fetchAdd(1, moRelaxed) return true proc delayedUntil(taskNode: TaskNode, curTask: Task): bool = - ## Redelay a task that depends on multiple pledges (in the `taskNode` linked list) - ## with 1 or more pledge fulfilled but still some unfulfilled. + ## Redelay a task that depends on multiple events (in the `taskNode` linked list) + ## with 1 or more events triggered but still some untriggered. ## ## Returns `true` if the task has been delayed. ## The task should not be accessed anymore by the current worker. ## Returns `false` if the task can be scheduled right away by the current worker thread. - preCondition: not taskNode.pledge.p.isNil + preCondition: not taskNode.event.e.isNil preCondition: bool(taskNode.task == curTask) - if taskNode.pledge.p.kind == Single: + if taskNode.event.e.kind == Single: delayedUntilSingle(taskNode, curTask) else: delayedUntilIter(taskNode, curTask) -# Public - single task pledge +# Public - single task event # ---------------------------------------------------- -proc initialize*(pledge: var Pledge, pool: var TLPoolAllocator) = - ## Initialize a pledge. - ## Tasks can depend on a pledge and in that case their scheduling - ## will be delayed until that pledge is fulfilled. +proc initialize*(event: var FlowEvent, pool: var TLPoolAllocator) = + ## Initialize a FlowEvent. + ## Tasks can depend on an event and in that case their scheduling + ## will be delayed until that event is triggered. ## This allows modelling precise data dependencies. - preCondition: pledge.p.isNil - pledge.p = pool.borrow(deref(PledgePtr)) - zeroMem(pledge.p, sizeof(deref(PledgePtr))) # We start the refCount at 0 - # TODO: mempooled MPSC channel https://github.com/mratsim/weave/issues/93 - pledge.p.kind = Single - pledge.p.union.impl.chan.initialize() - -proc delayedUntil*(task: Task, pledge: Pledge, pool: var TLPoolAllocator): bool = - ## Defers a task until a pledge is fulfilled + preCondition: event.e.isNil + event.e = pool.borrow0(deref(EventPtr)) + event.e.kind = Single + event.e.union.single.chan.initialize() + +proc delayedUntil*(task: Task, event: FlowEvent, pool: var TLPoolAllocator): bool = + ## Defers a task until an event is triggered ## ## Returns `true` if the task has been delayed. ## The task should not be accessed anymore by the current worker. ## Returns `false` if the task can be scheduled right away by the current worker thread. - preCondition: not pledge.p.isNil - preCondition: pledge.p.kind == Single + preCondition: not event.e.isNil + preCondition: event.e.kind == Single # Optimization to avoid paying the cost of atomics - if pledge.p.union.impl.fulfilled.load(moRelaxed): + if event.e.union.single.triggered.load(moRelaxed): fence(moAcquire) return false # Mutual exclusion / prevent races - discard pledge.p.union.impl.deferredIn.fetchAdd(1, moRelaxed) + discard event.e.union.single.deferredIn.fetchAdd(1, moRelaxed) - if pledge.p.union.impl.fulfilled.load(moRelaxed): + if event.e.union.single.triggered.load(moRelaxed): fence(moAcquire) - discard pledge.p.union.impl.deferredOut.fetchAdd(1, moRelaxed) + discard event.e.union.single.deferredOut.fetchAdd(1, moRelaxed) return false - # Send the task to the pledge fulfiller - let taskNode = pool.borrow(deref(TaskNode)) + # Send the task to the event triggerer + let taskNode = pool.borrow0(deref(TaskNode)) taskNode.task = task - taskNode.next.store(nil, moRelaxed) - taskNode.nextDep = nil - taskNode.pledge.p = nil # Don't need to store the pledge reference if there is only the current one taskNode.bucketID = NoIter - discard pledge.p.union.impl.chan.trySend(taskNode) - discard pledge.p.union.impl.deferredOut.fetchAdd(1, moRelaxed) + # Don't need to store the event reference if there is only the current one + discard event.e.union.single.chan.trySend(taskNode) + discard event.e.union.single.deferredOut.fetchAdd(1, moRelaxed) return true -template fulfillImpl*(pledge: Pledge, queue, enqueue: typed) = - ## A producer thread fulfills a pledge. - ## A pledge can only be fulfilled once. - ## A producer will immediately scheduled all tasks dependent on that pledge - ## unless they also depend on another unfulfilled pledge. +template triggerImpl*(event: FlowEvent, queue, enqueue: typed) = + ## A producer thread triggers an event. + ## An event can only be triggered once. + ## A producer will immediately scheduled all tasks dependent on that event + ## unless they also depend on another untriggered event. ## Dependent tasks scheduled at a later time will be scheduled immediately ## ## `queue` is the data structure for ready tasks ## `enqueue` is the correspondig enqueing proc ## This should be wrapped in a proc to avoid code-bloat as the template is big - preCondition: not pledge.p.isNil - preCondition: pledge.p.kind == Single - preCondition: not load(pledge.p.union.impl.fulfilled, moRelaxed) + preCondition: not event.e.isNil + preCondition: event.e.kind == Single + preCondition: not load(event.e.union.single.triggered, moRelaxed) - # Lock the pledge, new tasks should be scheduled right away + # Lock the event, new tasks should be scheduled right away fence(moRelease) - store(pledge.p.union.impl.fulfilled, true, moRelaxed) + store(event.e.union.single.triggered, true, moRelaxed) # TODO: some state machine here? while true: var task: Task var taskNode: TaskNode - while pledge.p.union.impl.chan.tryRecv(taskNode): + while event.e.union.single.chan.tryRecv(taskNode): ascertain: not taskNode.isNil ascertain: taskNode.bucketID == NoIter task = taskNode.task @@ -366,102 +352,101 @@ template fulfillImpl*(pledge: Pledge, queue, enqueue: typed) = enqueue(queue, task) recycle(taskNode) - if load(pledge.p.union.impl.deferredOut, moAcquire) != load(pledge.p.union.impl.deferredIn, moAcquire): + # Restart if an event producer didn't finish delaying a task + if load(event.e.union.single.deferredOut, moAcquire) != load(event.e.union.single.deferredIn, moAcquire): cpuRelax() else: break -# Public - iteration task pledge +# Public - iteration task event # ---------------------------------------------------- -proc initialize*(pledge: var Pledge, pool: var TLPoolAllocator, start, stop, stride: int32) = - ## Initialize a pledge for iteration tasks +proc initialize*(event: var FlowEvent, pool: var TLPoolAllocator, start, stop, stride: int32) = + ## Initialize an event for iteration tasks preCondition: stop > start preCondition: stride > 0 - preCondition: pledge.p.isNil + preCondition: event.e.isNil - pledge.p = pool.borrow(deref(PledgePtr)) + event.e = pool.borrow0(deref(EventPtr)) # We start refcount at 0 - pledge.p.kind = Iteration - pledge.p.union.iter.numBuckets = (stop - start + stride-1) div stride - pledge.p.union.iter.start = start - pledge.p.union.iter.stop = stop - pledge.p.union.iter.stride = stride + event.e.kind = Iteration + event.e.union.iter.numBuckets = (stop - start + stride-1) div stride + event.e.union.iter.start = start + event.e.union.iter.stop = stop + event.e.union.iter.stride = stride # The mempool doesn't support arrays - pledge.p.union.iter.impls = wv_alloc(PledgeImpl, pledge.p.union.iter.numBuckets) - zeroMem(pledge.p.union.iter.impls, pledge.p.union.iter.numBuckets * sizeof(PledgeImpl)) + event.e.union.iter.singles = wv_alloc(SingleEvent, event.e.union.iter.numBuckets) + zeroMem(event.e.union.iter.singles, event.e.union.iter.numBuckets * sizeof(SingleEvent)) - for i in 0 ..< pledge.p.union.iter.numBuckets: - pledge.p.union.iter.impls[i].chan.initialize() + for i in 0 ..< event.e.union.iter.numBuckets: + event.e.union.iter.singles[i].chan.initialize() -proc getBucket(pledge: Pledge, index: int32): int32 {.inline.} = +proc getBucket(event: FlowEvent, index: int32): int32 {.inline.} = ## Convert a possibly offset and/or strided for-loop iteration index - ## to a pledge bucket in the range [0, numBuckets) - preCondition: index in pledge.p.union.iter.start ..< pledge.p.union.iter.stop - result = (index - pledge.p.union.iter.start) div pledge.p.union.iter.stride + ## to an event bucket in the range [0, numBuckets) + preCondition: index in event.e.union.iter.start ..< event.e.union.iter.stop + result = (index - event.e.union.iter.start) div event.e.union.iter.stride -proc delayedUntil*(task: Task, pledge: Pledge, index: int32, pool: var TLPoolAllocator): bool = - ## Defers a task until a pledge[index] is fulfilled +proc delayedUntil*(task: Task, event: FlowEvent, index: int32, pool: var TLPoolAllocator): bool = + ## Defers a task until an event[index] is triggered ## ## Returns `true` if the task has been delayed. ## The task should not be accessed anymore by the current worker. ## Returns `false` if the task can be scheduled right away by the current worker thread. - preCondition: not pledge.p.isNil - preCondition: pledge.p.kind == Iteration + preCondition: not event.e.isNil + preCondition: event.e.kind == Iteration - let bucket = pledge.getBucket(index) + let bucket = event.getBucket(index) # Optimization to avoid paying the cost of atomics - if pledge.p.union.iter.impls[bucket].fulfilled.load(moRelaxed): + if event.e.union.iter.singles[bucket].triggered.load(moRelaxed): fence(moAcquire) return false # Mutual exclusion / prevent races - discard pledge.p.union.iter.impls[bucket].deferredIn.fetchAdd(1, moRelaxed) + discard event.e.union.iter.singles[bucket].deferredIn.fetchAdd(1, moRelaxed) - if pledge.p.union.iter.impls[bucket].fulfilled.load(moRelaxed): + if event.e.union.iter.singles[bucket].triggered.load(moRelaxed): fence(moAcquire) - discard pledge.p.union.iter.impls[bucket].deferredOut.fetchAdd(1, moRelaxed) + discard event.e.union.iter.singles[bucket].deferredOut.fetchAdd(1, moRelaxed) return false - # Send the task to the pledge fulfiller - let taskNode = pool.borrow(deref(TaskNode)) + # Send the task to the event triggerer + let taskNode = pool.borrow0(deref(TaskNode)) taskNode.task = task - taskNode.next.store(nil, moRelaxed) - taskNode.nextDep = nil - taskNode.pledge = default(Pledge) # Don't need to store the pledge reference if there is only the current one + # Don't need to store the event reference if there is only the current one taskNode.bucketID = bucket - discard pledge.p.union.iter.impls[bucket].chan.trySend(taskNode) - discard pledge.p.union.iter.impls[bucket].deferredOut.fetchAdd(1, moRelaxed) + discard event.e.union.iter.singles[bucket].chan.trySend(taskNode) + discard event.e.union.iter.singles[bucket].deferredOut.fetchAdd(1, moRelaxed) return true -template fulfillIterImpl*(pledge: Pledge, index: int32, queue, enqueue: typed) = - ## A producer thread fulfills a pledge. - ## A pledge can only be fulfilled once. - ## A producer will immediately scheduled all tasks dependent on that pledge - ## unless they also depend on another unfulfilled pledge. +template triggerIterImpl*(event: FlowEvent, index: int32, queue, enqueue: typed) = + ## A producer thread triggers an event. + ## An event can only be triggered once. + ## A producer will immediately scheduled all tasks dependent on that event + ## unless they also depend on another untriggered event. ## Dependent tasks scheduled at a later time will be scheduled immediately ## ## `queue` is the data structure for ready tasks ## `enqueue` is the correspondig enqueing proc ## This should be wrapped in a proc to avoid code-bloat as the template is big - preCondition: not pledge.p.isNil - preCondition: pledge.p.kind == Iteration + preCondition: not event.e.isNil + preCondition: event.e.kind == Iteration - let bucket = getBucket(pledge, index) - preCondition: not load(pledge.p.union.iter.impls[bucket].fulfilled, moRelaxed) + let bucket = getBucket(event, index) + preCondition: not load(event.e.union.iter.singles[bucket].triggered, moRelaxed) - # Lock the pledge, new tasks should be scheduled right away + # Lock the event, new tasks should be scheduled right away fence(moRelease) - store(pledge.p.union.iter.impls[bucket].fulfilled, true, moRelaxed) + store(event.e.union.iter.singles[bucket].triggered, true, moRelaxed) # TODO: some state machine here? while true: var task {.inject.}: Task var taskNode: TaskNode - while pledge.p.union.iter.impls[bucket].chan.tryRecv(taskNode): + while event.e.union.iter.singles[bucket].chan.tryRecv(taskNode): ascertain: not taskNode.isNil ascertain: taskNode.bucketID != NoIter task = taskNode.task @@ -477,7 +462,8 @@ template fulfillIterImpl*(pledge: Pledge, index: int32, queue, enqueue: typed) = enqueue(queue, task) recycle(taskNode) - if load(pledge.p.union.iter.impls[bucket].deferredOut, moAcquire) != load(pledge.p.union.iter.impls[bucket].deferredIn, moAcquire): + # Restart if an event producer didn't finish delaying a task + if load(event.e.union.iter.singles[bucket].deferredOut, moAcquire) != load(event.e.union.iter.singles[bucket].deferredIn, moAcquire): cpuRelax() else: break @@ -485,31 +471,31 @@ template fulfillIterImpl*(pledge: Pledge, index: int32, queue, enqueue: typed) = # Multiple dependencies # ------------------------------------------------------------------------------ -macro delayedUntilMulti*(task: Task, pool: var TLPoolAllocator, pledges: varargs[untyped]): untyped = +macro delayedUntilMulti*(task: Task, pool: var TLPoolAllocator, events: varargs[untyped]): untyped = ## Associate a task with multiple dependencies result = newStmtList() var taskNodesInitStmt = newStmtList() var firstNode, prevNode: NimNode - for i in 0 ..< pledges.len: + for i in 0 ..< events.len: var taskNode: NimNode var taskNodeInit = newStmtList() - if pledges[i].kind == nnkPar: - let pledge = pledges[i][0] - taskNode = genSym(nskLet, "taskNode_" & $pledge[i] & "_" & $pledges[i][1] & "_") - let bucket = newCall(bindSym"getBucket", pledge, pledges[i][1]) + if events[i].kind == nnkPar: + let event = events[i][0] + taskNode = genSym(nskLet, "taskNode_" & $event[i] & "_" & $events[i][1] & "_") + let bucket = newCall(bindSym"getBucket", event, events[i][1]) taskNodeInit.add quote do: - let `taskNode` = borrow(`pool`, deref(TaskNode)) + let `taskNode` = borrow0(`pool`, deref(TaskNode)) `taskNode`.task = `task` - `taskNode`.pledge = `pledge` + `taskNode`.event = `event` `taskNode`.bucketID = `bucket` else: - taskNode = genSym(nskLet, "taskNode_" & $pledges[i] & "_") - let pledge = pledges[i] + taskNode = genSym(nskLet, "taskNode_" & $events[i] & "_") + let event = events[i] taskNodeInit.add quote do: - let `taskNode` = borrow(`pool`, deref(TaskNode)) + let `taskNode` = borrow0(`pool`, deref(TaskNode)) `taskNode`.task = `task` - `taskNode`.pledge = `pledge` + `taskNode`.event = `event` `taskNode`.bucketID = NoIter if i != 0: taskNodeInit.add quote do: @@ -538,11 +524,11 @@ debugSizeAsserts: doAssert sizeof(ChannelMpscUnboundedBatch[TaskNode, false]) == 128, "MPSC channel size was " & $sizeof(ChannelMpscUnboundedBatch[TaskNode, false]) - doAssert sizeof(PledgeImpl) == 192, - "PledgeImpl size was " & $sizeof(PledgeImpl) + doAssert sizeof(SingleEvent) == 192, + "SingleEvent size was " & $sizeof(SingleEvent) - doAssert sizeof(default(PledgePtr)[]) <= WV_MemBlockSize, - "PledgePtr object size was " & $sizeof(default(PledgePtr)[]) + doAssert sizeof(default(EventPtr)[]) <= WV_MemBlockSize, + "EventPtr object size was " & $sizeof(default(EventPtr)[]) when isMainModule: type TaskStack = object @@ -569,87 +555,87 @@ when isMainModule: proc mainSingle() = var stack: TaskStack - var pledge1: Pledge - pledge1.initialize(pool) - block: # Pledge 1 + var event1: FlowEvent + event1.initialize(pool) + block: # FlowEvent 1 let task = wv_allocPtr(Task, zero = true) - let delayed = task.delayedUntil(pledge1, pool) + let delayed = task.delayedUntil(event1, pool) doAssert delayed doAssert stack.count == 0 - pledge1.fulfillImpl(stack, add) + event1.triggerImpl(stack, add) doAssert stack.count == 1 - block: # Pledge 1 - late + block: # FlowEvent 1 - late let task = wv_allocPtr(Task, zero = true) - let delayed = task.delayedUntil(pledge1, pool) + let delayed = task.delayedUntil(event1, pool) doAssert not delayed doAssert stack.count == 1 # enqueuing is left as an exercise to the late thread. - var pledge2: Pledge - pledge2.initialize(pool) + var event2: FlowEvent + event2.initialize(pool) block: block: let task = wv_allocPtr(Task, zero = true) - let delayed = task.delayedUntil(pledge2, pool) + let delayed = task.delayedUntil(event2, pool) doAssert delayed block: let task = wv_allocPtr(Task, zero = true) - let delayed = task.delayedUntil(pledge2, pool) + let delayed = task.delayedUntil(event2, pool) doAssert delayed doAssert stack.count == 1 - pledge2.fulfillImpl(stack, add) + event2.triggerImpl(stack, add) doAssert stack.count == 3 - echo "Simple pledge: SUCCESS" + echo "Simple event: SUCCESS" mainSingle() proc mainLoop() = var stack: TaskStack - var pledge1: Pledge - pledge1.initialize(pool, 0, 10, 1) - block: # Pledge 1 + var event1: FlowEvent + event1.initialize(pool, 0, 10, 1) + block: # FlowEvent 1 let task = wv_allocPtr(Task, zero = true) - let delayed = task.delayedUntil(pledge1, 3, pool) + let delayed = task.delayedUntil(event1, 3, pool) doAssert delayed doAssert stack.count == 0 - pledge1.fulfillIterImpl(3, stack, add) + event1.triggerIterImpl(3, stack, add) doAssert stack.count == 1 - block: # Pledge 1 - late + block: # FlowEvent 1 - late let task = wv_allocPtr(Task, zero = true) - let delayed = task.delayedUntil(pledge1, 3, pool) + let delayed = task.delayedUntil(event1, 3, pool) doAssert not delayed doAssert stack.count == 1 # enqueuing is left as an exercise to the late thread. - var pledge2: Pledge - pledge2.initialize(pool, 0, 10, 1) + var event2: FlowEvent + event2.initialize(pool, 0, 10, 1) block: block: let task = wv_allocPtr(Task, zero = true) - let delayed = task.delayedUntil(pledge2, 4, pool) + let delayed = task.delayedUntil(event2, 4, pool) doAssert delayed block: let task = wv_allocPtr(Task, zero = true) - let delayed = task.delayedUntil(pledge2, 4, pool) + let delayed = task.delayedUntil(event2, 4, pool) doAssert delayed doAssert stack.count == 1 - pledge2.fulfillIterImpl(4, stack, add) + event2.triggerIterImpl(4, stack, add) doAssert stack.count == 3 - echo "Loop pledge: SUCCESS" + echo "Loop event: SUCCESS" mainLoop() diff --git a/weave/datatypes/prell_deques.nim b/weave/datatypes/prell_deques.nim index c535d66c..9a2142e1 100644 --- a/weave/datatypes/prell_deques.nim +++ b/weave/datatypes/prell_deques.nim @@ -257,8 +257,7 @@ when isMainModule: var taskID{.global.} = 1 result = cache.pop() if result.isNil: - result = pool.borrow(deref(Task)) - zeroMem(result, sizeof(deref(Task))) + result = pool.borrow0(deref(Task)) result.fn = cast[type result.fn](taskID) taskID += 1 diff --git a/weave/executor.nim b/weave/executor.nim index 1eaafb31..532286ac 100644 --- a/weave/executor.nim +++ b/weave/executor.nim @@ -52,7 +52,7 @@ import ./contexts, ./config, ./runtime, ./datatypes/[sync_types, context_thread_local], ./instrumentation/[contracts, loggers], - ./cross_thread_com/[scoped_barriers, pledges, channels_mpsc_unbounded_batch, event_notifiers], + ./cross_thread_com/[event_notifiers, channels_mpsc_unbounded_batch], ./state_machines/sync_root {.push gcsafe, inline.} # TODO raises: [] diff --git a/weave/memory/lookaside_lists.nim b/weave/memory/lookaside_lists.nim index 00aedd2a..43eafb14 100644 --- a/weave/memory/lookaside_lists.nim +++ b/weave/memory/lookaside_lists.nim @@ -92,6 +92,14 @@ func pop*[T](lal: var LookAsideList[T]): T {.inline.} = result = lal.popImpl() +func pop0*[T](lal: var LookAsideList[T]): T {.inline.} = + ## Pop a node and zero-initialize it + ## As this is inline, the compiler can elide the zero-initialization + ## when it makes sense. + result = lal.pop() + if not result.isNil: + zeroMem(result, sizeof(deref(T))) + proc delete*[T](lal: var LookAsideList[T]) {.gcsafe.} = if not lal.registeredAt.isNil: lal.registeredAt.onHeartBeat = nil diff --git a/weave/memory/memory_pools.nim b/weave/memory/memory_pools.nim index 549cb112..d7a04d2a 100644 --- a/weave/memory/memory_pools.nim +++ b/weave/memory/memory_pools.nim @@ -507,6 +507,18 @@ proc borrow*(pool: var TLPoolAllocator, T: typedesc): ptr T = # Fast-path return cast[ptr T](pool.last[].allocBlock()) +proc borrow0*(pool: var TLPoolAllocator, T: typedesc): ptr T {.inline.} = + ## Provides an unused memory block of size + ## WV_MemBlockSize (256 bytes) + ## + ## The object is zero-initialized. + ## As this is inline, the compiler can elide the zero-initialization + ## when it makes sense. + ## + ## If the underlying pool runs out-of-memory, it will reserve more from the OS. + result = pool.borrow(T) + zeroMem(result, sizeof(T)) + proc recycle*[T](p: ptr T) {.gcsafe.} = ## Returns a memory block to its memory pool. ## @@ -654,7 +666,7 @@ when isMainModule: for i in 0 ..< Iters: for j in 0 ..< NumAllocs: - pointers[j] = pool.borrow(MyObject) + pointers[j] = pool.borrow0(MyObject) # Deallocate in mixed order - note that the mempool # is optimized for LIFO dealloc. for j in countup(0, NumAllocs-1, 2): @@ -766,7 +778,7 @@ when isMainModule: else: # workaround sizeof atomics assert sizeof(ValObj) == 16 - cast[Val](pool[].borrow(array[16, byte])) + cast[Val](pool[].borrow0(array[16, byte])) template valFree(kind: static AllocKind, val: Val) = when kind == System: diff --git a/weave/parallel_for.nim b/weave/parallel_for.nim index 79b95dd0..a4afeae2 100644 --- a/weave/parallel_for.nim +++ b/weave/parallel_for.nim @@ -109,16 +109,16 @@ template parallelForAwaitableWrapper( # 2 deallocs for eager FV and 3 for Lazy FV recycleFVN(fvNode) -proc parallelForSplitted(index, start, stop, stride, captured, capturedTy, dependsOn, body: NimNode): NimNode = - ## In case a parallelFor depends on iteration pledge indexed by the loop variable +proc parallelForSplitted(index, start, stop, stride, captured, capturedTy, dependsOnEvent, body: NimNode): NimNode = + ## In case a parallelFor depends on iteration event indexed by the loop variable ## we can't use regular parallel loop with lazy splitting ## we need to split the loop eagerly so that each iterations can be started independently - ## as soon as the corresponding iteration pledge is fulfilled. + ## as soon as the corresponding iteration event is triggered. ## In that case, the loop cannot have futures. result = newStmtList() let parForSplitted = ident("weaveTask_DelayedParForSplit_") - let pledge = dependsOn[0] + let event = dependsOnEvent[0] if captured.len > 0: let captured = if captured.len > 1: captured @@ -130,14 +130,14 @@ proc parallelForSplitted(index, start, stop, stride, captured, capturedTy, depen `body` for `index` in countup(`start`, `stop`-1, `stride`): - spawnDelayed((`pledge`, `index`), `parForSplitted`(`index`, `captured`)) + spawnOnEvents((`event`, `index`), `parForSplitted`(`index`, `captured`)) else: result.add quote do: proc `parForSplitted`(`index`: SomeInteger) {.nimcall, gcsafe.} = `body` for `index` in countup(`start`, `stop`-1, `stride`): - spawnDelayed((`pledge`, `index`), `parForSplitted`(`index`)) + spawnOnEvents((`event`, `index`), `parForSplitted`(`index`)) macro parallelForImpl(loopParams: untyped, stride: int, body: untyped): untyped = ## Parallel for loop @@ -176,15 +176,15 @@ macro parallelForImpl(loopParams: untyped, stride: int, body: untyped): untyped # Pledges # -------------------------------------------------------- - # TODO: support multiple pledges - let dependsOn = extractPledges(body) + # TODO: support multiple events + let dependsOnEvent = extractEvents(body) # If the input dependencies depends on the loop index # we need to eagerly split our lazily scheduled loop # as iterations cannot be scheduled at the same type # It also cannot be awaited with regular sync. - if dependsOn.kind == nnkPar and dependsOn[1].eqIdent(idx): - return parallelForSplitted(idx, start, stop, stride, captured, capturedTy, dependsOn, body) + if dependsOnEvent.kind == nnkPar and dependsOnEvent[1].eqIdent(idx): + return parallelForSplitted(idx, start, stop, stride, captured, capturedTy, dependsOnEvent, body) # Package the body in a proc # -------------------------------------------------------- @@ -245,7 +245,7 @@ macro parallelForImpl(loopParams: untyped, stride: int, body: untyped): untyped # -------------------------------------------------------- result.addLoopTask( parForTask, start, stop, stride, captured, CapturedTy, - dependsOn, + dependsOnEvent, futureIdent = future, resultFutureType = futTy ) @@ -284,7 +284,7 @@ macro parallelFor*(loopParams: untyped, body: untyped): untyped = ## This is useful to have conditional execution (for example fulfilling a data dependency) ## on nested loops. - # TODO - support pledge in reduction + # TODO - support event in reduction if (body[0].kind == nnkCall and body[0][0].eqIdent"reduce") or (body.len >= 2 and body[1].kind == nnkCall and body[1][0].eqIdent"reduce"): @@ -405,24 +405,24 @@ when isMainModule: proc main6() = init(Weave) - let pA = newPledge(0, 10, 1) - let pB = newPledge(0, 10, 1) + let eA = newFlowEvent(0, 10, 1) + let eB = newFlowEvent(0, 10, 1) parallelFor i in 0 ..< 10: - captures: {pA} + captures: {eA} sleep(i * 10) - pA.fulfill(i) + eA.trigger(i) echo "Step A - stream ", i, " at ", i * 10, " ms" parallelFor i in 0 ..< 10: - dependsOn: (pA, i) - captures: {pB} + dependsOnEvent: (eA, i) + captures: {eB} sleep(i * 10) - pB.fulfill(i) + eB.trigger(i) echo "Step B - stream ", i, " at ", 2 * i * 10, " ms" parallelFor i in 0 ..< 10: - dependsOn: (pB, i) + dependsOnEvent: (eB, i) sleep(i * 10) echo "Step C - stream ", i, " at ", 3 * i * 10, " ms" diff --git a/weave/parallel_jobs.nim b/weave/parallel_jobs.nim index 536ccde9..2c610160 100644 --- a/weave/parallel_jobs.nim +++ b/weave/parallel_jobs.nim @@ -20,30 +20,39 @@ import ./scheduler, ./contexts, ./config, ./datatypes/[flowvars, sync_types], ./instrumentation/contracts, - ./cross_thread_com/[pledges, channels_mpsc_unbounded_batch, event_notifiers], + ./cross_thread_com/[flow_events, channels_mpsc_unbounded_batch, event_notifiers], ./state_machines/sync_root, ./executor proc newJob(): Job {.inline.} = - result = jobProviderContext.mempool[].borrow(deref(Job)) - # result.fn = nil # Always overwritten - # result.parent = nil # Always overwritten - result.scopedBarrier = nil # Always overwritten - result.prev = nil - result.next.store(nil, moRelaxed) - result.start = 0 - result.cur = 0 - result.stop = 0 - result.stride = 0 - result.futures = nil - result.isLoop = false - result.hasFuture = false + result = jobProviderContext.mempool[].borrow0(deref(Job)) + # The job must be fully zero-ed including the data buffer + # otherwise datatypes that use custom destructors + # and that rely on "myPointer.isNil" to return early + # may read recycled garbage data. + # "FlowEvent" is such an example + + # TODO: The perf cost to the following is 17% as measured on fib(40) + + # # Zeroing is expensive, it's 96 bytes + # # result.fn = nil # Always overwritten + # # result.parent = nil # Always overwritten + # # result.scopedBarrier = nil # Always overwritten + # result.prev = nil + # result.next = nil + # result.start = 0 + # result.cur = 0 + # result.stop = 0 + # result.stride = 0 + # result.futures = nil + # result.isLoop = false + # result.hasFuture = false proc notifyJob() {.inline.} = Backoff: manager.jobNotifier[].notify() -proc submitImpl(pledges: NimNode, funcCall: NimNode): NimNode = +proc submitImpl(events: NimNode, funcCall: NimNode): NimNode = # We take typed argument so that overloading resolution # is already done and arguments are semchecked funcCall.expectKind(nnkCall) @@ -91,20 +100,20 @@ proc submitImpl(pledges: NimNode, funcCall: NimNode): NimNode = # Submit immediately or delay on dependencies var submitBlock: NimNode let job = ident"job" - if pledges.isNil: + if events.isNil: submitBlock = newCall(bindSym"submitJob", job) - elif pledges.len == 1: - let pledgeDesc = pledges[0] - if pledgeDesc.kind in {nnkIdent, nnkSym}: + elif events.len == 1: + let eventDesc = events[0] + if eventDesc.kind in {nnkIdent, nnkSym}: submitBlock = quote do: - if not delayedUntil(cast[Task](`job`), `pledgeDesc`, jobProviderContext.mempool[]): + if not delayedUntil(cast[Task](`job`), `eventDesc`, jobProviderContext.mempool[]): submitJob(`job`) else: - pledgeDesc.expectKind({nnkPar, nnkTupleConstr}) - let pledge = pledgeDesc[0] - let pledgeIndex = pledgeDesc[1] + eventDesc.expectKind({nnkPar, nnkTupleConstr}) + let event = eventDesc[0] + let eventIndex = eventDesc[1] submitBlock = quote do: - if not delayedUntil(cast[Task](`job`), `pledge`, int32(`pledgeIndex`), myMemPool()): + if not delayedUntil(cast[Task](`job`), `event`, int32(`eventIndex`), myMemPool()): submitJob(`job`) else: let delayedMulti = getAst( @@ -113,7 +122,7 @@ proc submitImpl(pledges: NimNode, funcCall: NimNode): NimNode = nnkDerefExpr.newTree( nnkDotExpr.newTree(bindSym"jobProviderContext", ident"mempool") ), - pledges + events ) ) submitBlock = quote do: @@ -242,11 +251,31 @@ macro submit*(fnCall: typed): untyped = ## Jobs are processed approximately in First-In-First-Out (FIFO) order. result = submitImpl(nil, fnCall) -macro submitDelayed*(pledges: varargs[typed], fnCall: typed): untyped = +macro submitOnEvents*(events: varargs[typed], fnCall: typed): untyped = ## Submit the input function call asynchronously to the Weave runtime. - ## The function call will only be scheduled when the pledge is fulfilled. + ## The function call will only be scheduled when the event is triggered. ## - ## This is a compatibility routine for foreign threads. + ## This is a compatibility routine for threads foreign to Weave (i.e. neither the root thread or a worker thread). + ## `setupSubmitterThread` MUST be called on the submitter thread beforehand + ## + ## This procedure is intended for interoperability with long-running threads + ## started with `createThread` + ## and other threadpools and/or execution engines, + ## use `spawn` otherwise. + ## + ## If the function calls returns a result, submit will wrap it in a Pending[T]. + ## You can use `waitFor` to block the current thread and extract the asynchronous result from the Pending[T]. + ## You can use `isReady` to check if result is available and if subsequent + ## `waitFor` calls would block or return immediately. + ## + ## Ensure that before settling on the Pending[T] of a delayed submit, its event can be triggered or you will deadlock. + result = submitImpl(events, fnCall) + +macro submitOnEvent*(event: FlowEvent, fnCall: typed): untyped = + ## Submit the input function call asynchronously to the Weave runtime. + ## The function call will only be scheduled when the event is triggered. + ## + ## This is a compatibility routine for threads foreign to Weave (i.e. neither the root thread or a worker thread). ## `setupSubmitterThread` MUST be called on the submitter thread beforehand ## ## This procedure is intended for interoperability with long-running threads @@ -255,9 +284,9 @@ macro submitDelayed*(pledges: varargs[typed], fnCall: typed): untyped = ## use `spawn` otherwise. ## ## If the function calls returns a result, submit will wrap it in a Pending[T]. - ## You can use `settle` to block the current thread and extract the asynchronous result from the Pending[T]. + ## You can use `waitFor` to block the current thread and extract the asynchronous result from the Pending[T]. ## You can use `isReady` to check if result is available and if subsequent - ## `settle` calls would block or return immediately. + ## `waitFor` calls would block or return immediately. ## - ## Ensure that before settling on the Pending[T] of a delayed submit, its pledge can be fulfilled or you will deadlock. - result = submitImpl(pledges, fnCall) + ## Ensure that before settling on the Pending[T] of a delayed submit, its event can be triggered or you will deadlock. + result = submitImpl(nnkArgList.newTree(event), fnCall) diff --git a/weave/parallel_macros.nim b/weave/parallel_macros.nim index c99f39a3..cce6a6c3 100644 --- a/weave/parallel_macros.nim +++ b/weave/parallel_macros.nim @@ -11,7 +11,7 @@ import # Internal ./datatypes/[sync_types, flowvars], ./contexts, ./scheduler, - ./cross_thread_com/[scoped_barriers, pledges] + ./cross_thread_com/[scoped_barriers, flow_events] # Parallel for utilities # ---------------------------------------------------------- @@ -147,18 +147,18 @@ proc extractFutureAndCaptures*(body: NimNode): tuple[future, captured, capturedT if body[i].kind == nnkCall: findCapturesAwaitable(i) -proc extractPledges*(body: NimNode): NimNode = - ## Extract the dependencies in/out (pledges) if any - template findPledges(idx: int) = - if body[idx][0].eqIdent"dependsOn": - assert result.isNil, "The dependsOn section can only be set once for a loop." +proc extractEvents*(body: NimNode): NimNode = + ## Extract the dependencies in/out (events) if any + template findEvents(idx: int) = + if body[idx][0].eqIdent"dependsOnEvent": # TODO, support multiple events + assert result.isNil, "The dependsOnEvent section can only be set once for a loop." result = body[idx][1][0] - # Remove the dependsOn section + # Remove the dependsOnEvent section body[idx] = nnkDiscardStmt.newTree(body[idx].toStrLit) for i in 0 ..< body.len-1: if body[i].kind == nnkCall: - findPledges(i) + findEvents(i) proc addSanityChecks*(statement, capturedTypes, capturedTypesSym: NimNode) = if capturedTypes.len > 0: @@ -272,16 +272,16 @@ proc addLoopTask*( if not delayedUntil(`task`, `dependsOn`, myMemPool()): schedule(`task`) else: - let (pledge, pledgeIndex) = (dependsOn[0], dependsOn[1]) - if pledgeIndex.kind == nnkIntLit and pledgeIndex.intVal == NoIter: + let (event, eventIndex) = (dependsOn[0], dependsOn[1]) + if eventIndex.kind == nnkIntLit and eventIndex.intVal == NoIter: scheduleBlock = quote do: - if not delayedUntil(`task`, `pledge`, myMemPool()): + if not delayedUntil(`task`, `event`, myMemPool()): schedule(`task`) else: # This is a dependency on a loop index from ANOTHER loop # not the loop that is currently scheduled. scheduleBlock = quote do: - if not delayedUntil(`task`, `pledge`, int32(`pledgeIndex`), myMemPool()): + if not delayedUntil(`task`, `event`, int32(`eventIndex`), myMemPool()): schedule(`task`) # --------------------------------------------------- diff --git a/weave/parallel_tasks.nim b/weave/parallel_tasks.nim index 8c73ca95..70ffa3d5 100644 --- a/weave/parallel_tasks.nim +++ b/weave/parallel_tasks.nim @@ -21,10 +21,10 @@ import ./scheduler, ./contexts, ./datatypes/[flowvars, sync_types], ./instrumentation/contracts, - ./cross_thread_com/[scoped_barriers, pledges] + ./cross_thread_com/[scoped_barriers, flow_events] -proc spawnImpl(pledges: NimNode, funcCall: NimNode): NimNode = +proc spawnImpl(events: NimNode, funcCall: NimNode): NimNode = # We take typed argument so that overloading resolution # is already done and arguments are semchecked funcCall.expectKind(nnkCall) @@ -72,24 +72,24 @@ proc spawnImpl(pledges: NimNode, funcCall: NimNode): NimNode = # Schedule immediately or delay on dependencies var scheduleBlock: NimNode let task = ident"task" - if pledges.isNil: + if events.isNil: scheduleBlock = newCall(bindSym"schedule", task) - elif pledges.len == 1: - let pledgeDesc = pledges[0] - if pledgeDesc.kind in {nnkIdent, nnkSym}: + elif events.len == 1: + let eventDesc = events[0] + if eventDesc.kind in {nnkIdent, nnkSym}: scheduleBlock = quote do: - if not delayedUntil(`task`, `pledgeDesc`, myMemPool()): + if not delayedUntil(`task`, `eventDesc`, myMemPool()): schedule(`task`) else: - pledgeDesc.expectKind({nnkPar, nnkTupleConstr}) - let pledge = pledgeDesc[0] - let pledgeIndex = pledgeDesc[1] + eventDesc.expectKind({nnkPar, nnkTupleConstr}) + let event = eventDesc[0] + let eventIndex = eventDesc[1] scheduleBlock = quote do: - if not delayedUntil(`task`, `pledge`, int32(`pledgeIndex`), myMemPool()): + if not delayedUntil(`task`, `event`, int32(`eventIndex`), myMemPool()): schedule(`task`) else: let delayedMulti = getAst(delayedUntilMulti( - task, newCall(bindSym"myMemPool"), pledges) + task, newCall(bindSym"myMemPool"), events) ) scheduleBlock = quote do: if not `delayedMulti`: @@ -207,16 +207,31 @@ macro spawn*(fnCall: typed): untyped = ## Tasks are processed approximately in Last-In-First-Out (LIFO) order result = spawnImpl(nil, fnCall) -macro spawnDelayed*(pledges: varargs[typed], fnCall: typed): untyped = +macro spawnOnEvents*(events: varargs[typed], fnCall: typed): untyped = ## Spawns the input function call asynchronously, potentially on another thread of execution. - ## The function call will only be scheduled when the pledge is fulfilled. + ## The function call will only be scheduled when the events are triggered. ## ## If the function calls returns a result, spawn will wrap it in a Flowvar. ## You can use sync to block the current thread and extract the asynchronous result from the flowvar. - ## spawnDelayed returns immediately. ## - ## Ensure that before syncing on the flowvar of a delayed spawn, its pledge can be fulfilled or you will deadlock. - result = spawnImpl(pledges, fnCall) + ## spawnOnEvents returns immediately. + ## + ## Ensure that before syncing on the flowvar of a triggered spawn, + ## its events can be triggered or you will deadlock. + result = spawnImpl(events, fnCall) + +macro spawnOnEvent*(event: FlowEvent, fnCall: typed): untyped = + ## Spawns the input function call asynchronously, potentially on another thread of execution. + ## The function call will only be scheduled when the event is triggered. + ## + ## If the function calls returns a result, spawn will wrap it in a Flowvar. + ## You can use sync to block the current thread and extract the asynchronous result from the flowvar. + ## + ## spawnOnEvent returns immediately. + ## + ## Ensure that before syncing on the flowvar of a triggered spawn, + ## its event can be triggered or you will deadlock. + result = spawnImpl(nnkArgList.newTree(event), fnCall) # Sanity checks # -------------------------------------------------------- @@ -318,15 +333,15 @@ when isMainModule: block: # Delayed computation - proc echoA(pA: Pledge) = + proc echoA(eA: FlowEvent) = echo "Display A, sleep 1s, create parallel streams 1 and 2" sleep(1000) - pA.fulfill() + eA.trigger() - proc echoB1(pB1: Pledge) = + proc echoB1(eB1: FlowEvent) = echo "Display B1, sleep 1s" sleep(1000) - pB1.fulfill() + eB1.trigger() proc echoB2() = echo "Display B2, exit stream" @@ -338,12 +353,12 @@ when isMainModule: proc main() = echo "Sanity check 3: Dataflow parallelism" init(Weave) - let pA = newPledge() - let pB1 = newPledge() - let done = spawnDelayed(pB1, echoC1()) - spawnDelayed pA, echoB2() - spawnDelayed pA, echoB1(pB1) - spawn echoA(pA) + let eA = newFlowEvent() + let eB1 = newFlowEvent() + let done = spawnOnEvent(eB1, echoC1()) + spawnOnEvent eA, echoB2() + spawnOnEvent eA, echoB1(eB1) + spawn echoA(eA) discard sync(done) exit(Weave) @@ -351,19 +366,19 @@ when isMainModule: block: # Delayed computation with multiple dependencies - proc echoA(pA: Pledge) = + proc echoA(eA: FlowEvent) = echo "Display A, sleep 1s, create parallel streams 1 and 2" sleep(1000) - pA.fulfill() + eA.trigger() - proc echoB1(pB1: Pledge) = + proc echoB1(eB1: FlowEvent) = echo "Display B1, sleep 1s" sleep(1000) - pB1.fulfill() + eB1.trigger() - proc echoB2(pB2: Pledge) = + proc echoB2(eB2: FlowEvent) = echo "Display B2, no sleep" - pB2.fulfill() + eB2.trigger() proc echoC12() = echo "Display C12, exit stream" @@ -371,13 +386,13 @@ when isMainModule: proc main() = echo "Sanity check 4: Dataflow parallelism with multiple dependencies" init(Weave) - let pA = newPledge() - let pB1 = newPledge() - let pB2 = newPledge() - spawnDelayed pB1, pB2, echoC12() - spawnDelayed pA, echoB2(pB2) - spawnDelayed pA, echoB1(pB1) - spawn echoA(pA) + let eA = newFlowEvent() + let eB1 = newFlowEvent() + let eB2 = newFlowEvent() + spawnOnEvents eB1, eB2, echoC12() + spawnOnEvent eA, echoB2(eB2) + spawnOnEvent eA, echoB1(eB1) + spawn echoA(eA) exit(Weave) echo "Weave runtime exited" diff --git a/weave/state_machines/dispatch_events.nim b/weave/state_machines/dispatch_events.nim index 178863ea..3535e646 100644 --- a/weave/state_machines/dispatch_events.nim +++ b/weave/state_machines/dispatch_events.nim @@ -20,7 +20,7 @@ proc nextTask*(childTask: static bool): Task {.inline.} = # Note: # We distinguish jobs and tasks. # - Jobs are submitted to Weave by external threads. - # Jobs enqueued have all their pledges resolved and so are independent. + # Jobs enqueued have all their events resolved and so are independent. # To ensure fairness in worst-case scenario, we execute them in FIFO order. # Jobs may be split into multiple tasks. # - Tasks are spawned on Weave runtime, we want to process them as fast as possible.