From d93fce2ae4fe4e53bd43eeefb98aa5330e321a9c Mon Sep 17 00:00:00 2001 From: Mamy Ratsimbazafy Date: Sat, 4 Apr 2020 16:17:57 +0200 Subject: [PATCH] Workaround c++ pledge (#108) * Deactivate pledges when compiling to C++ * Add C++ to CI --- README.md | 25 +++---- azure-pipelines.yml | 21 +++--- weave.nim | 16 +++-- weave.nimble | 18 ++--- weave/channels/pledges.nim | 2 + weave/contexts.nim | 61 ++++++++-------- weave/parallel_for.nim | 54 ++++++++------- weave/parallel_macros.nim | 13 +++- weave/parallel_tasks.nim | 138 ++++++++++++++++++++----------------- 9 files changed, 188 insertions(+), 160 deletions(-) diff --git a/README.md b/README.md index 169d035..dfc3bda 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Weave, a state-of-the-art multithreading runtime [![Build Status: Travis](https://img.shields.io/travis/com/mratsim/weave/master?label=Travis%20%28Linux%20x86_64%2FARM64%29)](https://travis-ci.com/mratsim/weave) -[![Build Status: Azure](https://img.shields.io/azure-devops/build/numforge/69bc2700-4fa7-4292-a0b3-331ddb721640/2/master?label=Azure%20%28Linux%2064-bit%2C%20Windows%2064-bit%2C%20MacOS%2064-bit%29)](https://dev.azure.com/numforge/Weave/_build?definitionId=2&branchName=master) +[![Build Status: Azure](https://img.shields.io/azure-devops/build/numforge/69bc2700-4fa7-4292-a0b3-331ddb721640/2/master?label=Azure%20%28C%2FC%2B%2B%20Linux%2064-bit%2C%20Windows%2064-bit%2C%20MacOS%2064-bit%29)](https://dev.azure.com/numforge/Weave/_build?definitionId=2&branchName=master) [![License: Apache](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) @@ -10,7 +10,7 @@ _"Good artists borrow, great artists steal."_ -- Pablo Picasso Weave (codenamed "Project Picasso") is a multithreading runtime for the [Nim programming language](https://nim-lang.org/). -It is continuously tested on Linux, MacOS and Windows for the following CPU architectures: x86, x86_64 and ARM64. +It is continuously tested on Linux, MacOS and Windows for the following CPU architectures: x86, x86_64 and ARM64 with the C and C++ backends. Weave aims to provide a composable, high-performance, ultra-low overhead and fine-grained parallel runtime that frees developers from the common worries of "are my tasks big enough to be parallelized?", "what should be my grain size?", "what if the time they take is completely unknown or different?" or "is parallel-for worth it if it's just a matrix addition? On what CPUs? What if it's exponentiation?". @@ -45,12 +45,12 @@ instead of being based on traditional work-stealing with shared-memory deques. Weave can be simply installed with ```bash -nimble install weave@#master +nimble install weave ``` or for the devel version ```bash -nimble install weave +nimble install weave@#master ``` Weave requires at least Nim v1.2.0 @@ -203,15 +203,14 @@ For example on MacOS, the `pthread` implementation does not expose barrier funct ### C++ compilation -At the moment C++ compilation is not available on latest Nim + latest Weave. +Weave provides a "dataflow parallelism" feature that +allows: +- building a computation graph lazily +- by delaying parallel tasks depending on arbitrary conditions -The new "dataflow parallelism" feature that -allows delaying parallel tasks depending on arbitrary conditions -requires a data structure (`Pledge`) that is valid in C but invalid in C++. +It requires a data structure (`Pledge`) that is valid in C but invalid in C++ due to an incompatible mix of `Atomics` in `union type` and `flexible array member`. https://github.com/mratsim/weave/issues/95. -C++ compilation works with the following combination: -- Weave v0.3.0 -- Nim devel [@bf2e052e](https://github.com/nim-lang/Nim/commit/bf2e052e6d97c1117603480547804dd98d1ada71) +This feature is deactivated when compiling to C++. ### Windows 32-bit @@ -246,7 +245,7 @@ This means that a thread sleeping or stuck in a long computation may starve othe Experimental features might see API and/or implementation changes. -For example both parallelForStaged and parallelReduce allow for reduction but +For example both parallelForStaged and parallelReduce allow reductions but parallelForStaged is more flexible, it however requires explicit use of locks and/or atomics. LazyFlowvars may be enabled by default for certain sizes or if escape analysis become possible @@ -349,6 +348,8 @@ Or parallel reduce might be removed to only keep parallelForStaged. ### Dataflow parallelism +> Warning ⚠️: This feature is not available with the C++ backend. + Dataflow parallelism allows expressing fine-grained data dependencies between tasks. Concretly a task is delayed until all its dependencies are met and once met, it is triggered immediately. diff --git a/azure-pipelines.yml b/azure-pipelines.yml index a885e41..0503788 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -20,11 +20,11 @@ strategy: UCPU: amd64 CHANNEL: devel TEST_LANG: c - # Windows_cpp_devel_64bit: - # VM: 'windows-latest' - # UCPU: amd64 - # CHANNEL: devel - # TEST_LANG: cpp + Windows_cpp_devel_64bit: + VM: 'windows-latest' + UCPU: amd64 + CHANNEL: devel + TEST_LANG: cpp Linux_stable_64bit: VM: 'ubuntu-16.04' UCPU: amd64 @@ -35,15 +35,14 @@ strategy: UCPU: amd64 CHANNEL: devel TEST_LANG: c - # Linux_cpp_devel_64bit: - # VM: 'ubuntu-16.04' - # UCPU: amd64 - # CHANNEL: devel - # TEST_LANG: cpp + Linux_cpp_devel_64bit: + VM: 'ubuntu-16.04' + UCPU: amd64 + CHANNEL: devel + TEST_LANG: cpp # Linux_devel_32bit: # VM: 'ubuntu-16.04' - # ARCH: x86 # UCPU: i686 # CHANNEL: devel # TEST_LANG: c diff --git a/weave.nim b/weave.nim index 5e938cd..537f212 100644 --- a/weave.nim +++ b/weave.nim @@ -7,8 +7,7 @@ import weave/[parallel_tasks, parallel_for, parallel_for_staged, runtime, runtime_fsm, await_fsm], - weave/datatypes/flowvars, - weave/channels/pledges + weave/datatypes/flowvars export Flowvar, Weave, @@ -19,10 +18,17 @@ export isSpawned, getThreadId, # Experimental threadlocal prologue/epilogue - parallelForStaged, parallelForStagedStrided, + parallelForStaged, parallelForStagedStrided + +when not defined(cpp): # Experimental dataflow parallelism - spawnDelayed, Pledge, - fulfill, newPledge + import weave/channels/pledges + + export + spawnDelayed, Pledge, + fulfill, newPledge +else: + {.warning: "In C++ mode Pledges (for data flow parallelism) are not available, https://github.com/mratsim/weave/issues/95".} # TODO, those are workaround for not binding symbols in spawn macro import weave/contexts diff --git a/weave.nimble b/weave.nimble index 40a4e28..f6370a2 100644 --- a/weave.nimble +++ b/weave.nimble @@ -16,13 +16,13 @@ proc test(flags, path: string) = # Note: we compile in release mode. This still have stacktraces # but is much faster than -d:debug - # Compilation language is controlled by WEAVE_TEST_LANG + # Compilation language is controlled by TEST_LANG var lang = "c" if existsEnv"TEST_LANG": lang = getEnv"TEST_LANG" echo "\n========================================================================================" - echo "Running [", flags, "] ", path + echo "Running [ ", lang, " ", flags, " ] ", path echo "========================================================================================" exec "nim " & lang & " " & flags & " --verbosity:0 --hints:off --warnings:off --threads:on -d:release --outdir:build -r " & path @@ -30,7 +30,9 @@ task test, "Run Weave tests": test "", "weave/channels/channels_spsc_single.nim" test "", "weave/channels/channels_spsc_single_ptr.nim" test "", "weave/channels/channels_mpsc_unbounded_batch.nim" - test "", "weave/channels/pledges.nim" + + if not existsEnv"TEST_LANG" or getEnv"TEST_LANG" != "cpp": + test "", "weave/channels/pledges.nim" test "", "weave/datatypes/binary_worker_trees.nim" test "", "weave/datatypes/bounded_queues.nim" @@ -60,9 +62,8 @@ task test, "Run Weave tests": test "", "benchmarks/single_task_producer/weave_spc.nim" test "", "benchmarks/bouncing_producer_consumer/weave_bpc.nim" when defined(i386) or defined(amd64): - test "", "benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim" - # This is too slow - # test "", "benchmarks/matmul_gemm_blas/weave_gemm.nim" + if not existsEnv"TEST_LANG" or getEnv"TEST_LANG" != "cpp": + test "", "benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim" test "-d:WV_LazyFlowvar", "benchmarks/dfs/weave_dfs.nim" test "-d:WV_LazyFlowvar", "benchmarks/fibonacci/weave_fib.nim" @@ -73,6 +74,5 @@ task test, "Run Weave tests": test "-d:WV_LazyFlowvar", "benchmarks/single_task_producer/weave_spc.nim" test "-d:WV_LazyFlowvar", "benchmarks/bouncing_producer_consumer/weave_bpc.nim" when defined(i386) or defined(amd64): - test "-d:WV_LazyFlowvar", "benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim" - # This is too slow on Azure windows machines - # test "-d:WV_LazyFlowvar", "benchmarks/matmul_gemm_blas/weave_gemm.nim" + if not existsEnv"TEST_LANG" or getEnv"TEST_LANG" != "cpp": + test "-d:WV_LazyFlowvar", "benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim" diff --git a/weave/channels/pledges.nim b/weave/channels/pledges.nim index cbd45e8..4c2f3ad 100644 --- a/weave/channels/pledges.nim +++ b/weave/channels/pledges.nim @@ -15,6 +15,8 @@ import ../instrumentation/contracts, ../config +static: doAssert not defined(cpp), "Pledges are not compatible with C++ target at the moment: https://github.com/nim-lang/Nim/issues/13062, https://github.com/nim-lang/Nim/issues/13093" + # Pledges # ---------------------------------------------------- # Pledges are the counterpart to Flowvar. diff --git a/weave/contexts.nim b/weave/contexts.nim index 52706c5..380cc7b 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -7,7 +7,7 @@ import ./datatypes/[context_global, context_thread_local, sync_types, prell_deques, binary_worker_trees], - ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, pledges], + ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch], ./memory/[persistacks, lookaside_lists, memory_pools, allocs], ./config, ./instrumentation/[profilers, loggers, contracts] @@ -18,6 +18,9 @@ when defined(WV_metrics): Backoff: import ./channels/event_notifiers +when not defined(cpp): + import ./channels/pledges + # Contexts # ---------------------------------------------------------------------------------- @@ -131,34 +134,34 @@ proc flushAndDispose*(dq: var PrellDeque) = # Pledges # ---------------------------------------------------------------------------------- - -proc newPledge*(): Pledge = - ## Creates a pledge - ## Tasks associated with a pledge are only scheduled when the pledge is fulfilled. - ## A pledge can only be fulfilled once. - ## Pledges enable modeling precise producer-consumer data dependencies. - result.initialize(myMemPool()) - -proc newPledge*(start, stop, stride: SomeInteger): Pledge = - ## Creates a loop iteration pledge. - ## With a loop iteration pledge, tasks can be associated with a precise loop index. - ## - ## Tasks associated with a pledge are only scheduled when the pledge is fulfilled. - ## A pledge can only be fulfilled once. - ## Pledges enable modeling precise producer-consumer data dependencies. - result.initialize(myMemPool(), start.int32, stop.int32, stride.int32) - -proc fulfill*(pledge: Pledge) = - ## Fulfills a pledge - ## All ready tasks that depended on that pledge will be scheduled immediately. - ## A ready task is a task that has all its pledged dependencies fulfilled. - fulfillImpl(pledge, myWorker().deque, addFirst) - -proc fulfill*(pledge: Pledge, index: SomeInteger) = - ## Fulfills an iteration pledge - ## All ready tasks that depended on that pledge will be scheduled immediately. - ## A ready task is a task that has all its pledged dependencies fulfilled. - fulfillIterImpl(pledge, int32(index), myWorker().deque, addFirst) +when not defined(cpp): + proc newPledge*(): Pledge = + ## Creates a pledge + ## Tasks associated with a pledge are only scheduled when the pledge is fulfilled. + ## A pledge can only be fulfilled once. + ## Pledges enable modeling precise producer-consumer data dependencies. + result.initialize(myMemPool()) + + proc newPledge*(start, stop, stride: SomeInteger): Pledge = + ## Creates a loop iteration pledge. + ## With a loop iteration pledge, tasks can be associated with a precise loop index. + ## + ## Tasks associated with a pledge are only scheduled when the pledge is fulfilled. + ## A pledge can only be fulfilled once. + ## Pledges enable modeling precise producer-consumer data dependencies. + result.initialize(myMemPool(), start.int32, stop.int32, stride.int32) + + proc fulfill*(pledge: Pledge) = + ## Fulfills a pledge + ## All ready tasks that depended on that pledge will be scheduled immediately. + ## A ready task is a task that has all its pledged dependencies fulfilled. + fulfillImpl(pledge, myWorker().deque, addFirst) + + proc fulfill*(pledge: Pledge, index: SomeInteger) = + ## Fulfills an iteration pledge + ## All ready tasks that depended on that pledge will be scheduled immediately. + ## A ready task is a task that has all its pledged dependencies fulfilled. + fulfillIterImpl(pledge, int32(index), myWorker().deque, addFirst) # Dynamic Scopes # ---------------------------------------------------------------------------------- diff --git a/weave/parallel_for.nim b/weave/parallel_for.nim index 8584d13..d612327 100644 --- a/weave/parallel_for.nim +++ b/weave/parallel_for.nim @@ -17,7 +17,6 @@ import ./contexts, ./runtime, ./config, ./instrumentation/[loggers, contracts], ./datatypes/flowvars, ./await_fsm, - ./channels/pledges, ./parallel_tasks when not compileOption("threads"): @@ -404,34 +403,37 @@ when isMainModule: main5() echo "-------------------------" - block: - proc main6() = - init(Weave) + when not defined(cpp): + import ./channels/pledges - let pA = newPledge(0, 10, 1) - let pB = newPledge(0, 10, 1) + block: + proc main6() = + init(Weave) - parallelFor i in 0 ..< 10: - captures: {pA} - sleep(i * 10) - pA.fulfill(i) - echo "Step A - stream ", i, " at ", i * 10, " ms" + let pA = newPledge(0, 10, 1) + let pB = newPledge(0, 10, 1) - parallelFor i in 0 ..< 10: - dependsOn: (pA, i) - captures: {pB} - sleep(i * 10) - pB.fulfill(i) - echo "Step B - stream ", i, " at ", 2 * i * 10, " ms" + parallelFor i in 0 ..< 10: + captures: {pA} + sleep(i * 10) + pA.fulfill(i) + echo "Step A - stream ", i, " at ", i * 10, " ms" - parallelFor i in 0 ..< 10: - dependsOn: (pB, i) - sleep(i * 10) - echo "Step C - stream ", i, " at ", 3 * i * 10, " ms" + parallelFor i in 0 ..< 10: + dependsOn: (pA, i) + captures: {pB} + sleep(i * 10) + pB.fulfill(i) + echo "Step B - stream ", i, " at ", 2 * i * 10, " ms" - exit(Weave) + parallelFor i in 0 ..< 10: + dependsOn: (pB, i) + sleep(i * 10) + echo "Step C - stream ", i, " at ", 3 * i * 10, " ms" - echo "Dataflow loop parallelism" - echo "-------------------------" - main6() - echo "-------------------------" + exit(Weave) + + echo "Dataflow loop parallelism" + echo "-------------------------" + main6() + echo "-------------------------" diff --git a/weave/parallel_macros.nim b/weave/parallel_macros.nim index ba68c4e..e71c077 100644 --- a/weave/parallel_macros.nim +++ b/weave/parallel_macros.nim @@ -11,8 +11,15 @@ import # Internal ./datatypes/[sync_types, flowvars], ./contexts, ./instrumentation/profilers, - ./scheduler, - ./channels/pledges + ./scheduler + +when not defined(cpp): + import ./channels/pledges +else: + template delayedUntilMulti(task, pool: untyped, pledges: varargs[untyped]): untyped = + discard + + const NoIter = -1 # Parallel for utilities # ---------------------------------------------------------- @@ -306,7 +313,7 @@ proc addLoopTask*( `task`.cur = `start` `task`.stop = `stop` `task`.stride = `stride` - + `task`.futureSize = uint8(sizeof(`resultFutureType`.T)) `task`.hasFuture = true `task`.isLoop = true diff --git a/weave/parallel_tasks.nim b/weave/parallel_tasks.nim index f13add9..51074b8 100644 --- a/weave/parallel_tasks.nim +++ b/weave/parallel_tasks.nim @@ -14,8 +14,13 @@ import # Internal ./scheduler, ./contexts, ./await_fsm, ./datatypes/[flowvars, sync_types], - ./instrumentation/[contracts, profilers], - ./channels/pledges + ./instrumentation/[contracts, profilers] + +when not defined(cpp): + import ./channels/pledges +else: + template delayedUntilMulti(task, pool: untyped, pledges: varargs[untyped]): untyped = + discard # workaround visibility issues export forceFuture @@ -243,66 +248,69 @@ when isMainModule: main2() - block: # Delayed computation - - proc echoA(pA: Pledge) = - echo "Display A, sleep 1s, create parallel streams 1 and 2" - sleep(1000) - pA.fulfill() - - proc echoB1(pB1: Pledge) = - echo "Display B1, sleep 1s" - sleep(1000) - pB1.fulfill() - - proc echoB2() = - echo "Display B2, exit stream" - - proc echoC1() = - echo "Display C1, exit stream" - - proc main() = - echo "Sanity check 3: Dataflow parallelism" - init(Weave) - let pA = newPledge() - let pB1 = newPledge() - spawnDelayed pB1, echoC1() - spawnDelayed pA, echoB2() - spawnDelayed pA, echoB1(pB1) - spawn echoA(pA) - exit(Weave) - - main() - - block: # Delayed computation with multiple dependencies - - proc echoA(pA: Pledge) = - echo "Display A, sleep 1s, create parallel streams 1 and 2" - sleep(1000) - pA.fulfill() - - proc echoB1(pB1: Pledge) = - echo "Display B1, sleep 1s" - sleep(1000) - pB1.fulfill() - - proc echoB2(pB2: Pledge) = - echo "Display B2, no sleep" - pB2.fulfill() - - proc echoC12() = - echo "Display C12, exit stream" - - proc main() = - echo "Sanity check 4: Dataflow parallelism with multiple dependencies" - init(Weave) - let pA = newPledge() - let pB1 = newPledge() - let pB2 = newPledge() - spawnDelayed pB1, pB2, echoC12() - spawnDelayed pA, echoB2(pB2) - spawnDelayed pA, echoB1(pB1) - spawn echoA(pA) - exit(Weave) - - main() + when not defined(cpp): + import ./channels/pledges + + block: # Delayed computation + + proc echoA(pA: Pledge) = + echo "Display A, sleep 1s, create parallel streams 1 and 2" + sleep(1000) + pA.fulfill() + + proc echoB1(pB1: Pledge) = + echo "Display B1, sleep 1s" + sleep(1000) + pB1.fulfill() + + proc echoB2() = + echo "Display B2, exit stream" + + proc echoC1() = + echo "Display C1, exit stream" + + proc main() = + echo "Sanity check 3: Dataflow parallelism" + init(Weave) + let pA = newPledge() + let pB1 = newPledge() + spawnDelayed pB1, echoC1() + spawnDelayed pA, echoB2() + spawnDelayed pA, echoB1(pB1) + spawn echoA(pA) + exit(Weave) + + main() + + block: # Delayed computation with multiple dependencies + + proc echoA(pA: Pledge) = + echo "Display A, sleep 1s, create parallel streams 1 and 2" + sleep(1000) + pA.fulfill() + + proc echoB1(pB1: Pledge) = + echo "Display B1, sleep 1s" + sleep(1000) + pB1.fulfill() + + proc echoB2(pB2: Pledge) = + echo "Display B2, no sleep" + pB2.fulfill() + + proc echoC12() = + echo "Display C12, exit stream" + + proc main() = + echo "Sanity check 4: Dataflow parallelism with multiple dependencies" + init(Weave) + let pA = newPledge() + let pB1 = newPledge() + let pB2 = newPledge() + spawnDelayed pB1, pB2, echoC12() + spawnDelayed pA, echoB2(pB2) + spawnDelayed pA, echoB1(pB1) + spawn echoA(pA) + exit(Weave) + + main()