From ae084592419827cf43e8e6c4fe330d04751c6ff3 Mon Sep 17 00:00:00 2001 From: Yakup Budanaz Date: Fri, 13 Dec 2024 15:47:40 +0100 Subject: [PATCH] Refactor and support CPU_Pinned --- dace/codegen/targets/cpp.py | 5 +-- dace/codegen/targets/cuda.py | 54 ++++++++++++++++--------- dace/sdfg/validation.py | 8 ++-- tests/deferred_alloc_test.py | 77 ++++++++++++++++++++++++++++++++---- 4 files changed, 112 insertions(+), 32 deletions(-) diff --git a/dace/codegen/targets/cpp.py b/dace/codegen/targets/cpp.py index 26b34637a3..8357ca1fa8 100644 --- a/dace/codegen/targets/cpp.py +++ b/dace/codegen/targets/cpp.py @@ -592,8 +592,7 @@ def replace_pattern(match): def _get_deferred_size_names(desc, name): - if (desc.storage != dtypes.StorageType.GPU_Global and - desc.storage != dtypes.StorageType.CPU_Heap and + if (desc.storage not in dtypes.REALLOCATABLE_STORAGES and not desc.transient): return None def check_dace_defer(elements): @@ -603,7 +602,7 @@ def check_dace_defer(elements): return False deferred_size_names = None if check_dace_defer(desc.shape): - if desc.storage == dtypes.StorageType.GPU_Global or desc.storage == dtypes.StorageType.CPU_Heap: + if desc.storage in dtypes.REALLOCATABLE_STORAGES: deferred_size_names = [] for i, elem in enumerate(desc.shape): if "__dace_defer" in str(elem): diff --git a/dace/codegen/targets/cuda.py b/dace/codegen/targets/cuda.py index 2eacaae132..418cbbfdbd 100644 --- a/dace/codegen/targets/cuda.py +++ b/dace/codegen/targets/cuda.py @@ -136,6 +136,7 @@ def __init__(self, frame_codegen: 'DaCeCodeGenerator', sdfg: SDFG): dispatcher.register_copy_dispatcher(dtypes.StorageType.Register, st, sched_type, illegal_copy) dispatcher.register_reallocate_dispatcher(dtypes.StorageType.GPU_Global, self) + dispatcher.register_reallocate_dispatcher(dtypes.StorageType.CPU_Pinned, self) # End of illegal copies # End of dispatcher registration ###################################### @@ -606,11 +607,12 @@ def allocate_array(self, sdfg: SDFG, cfg: ControlFlowRegion, dfg: StateSubgraphV arrsize = nodedesc.total_size is_dynamically_sized = symbolic.issymbolic(arrsize, sdfg.constants) arrsize_malloc = '%s * sizeof(%s)' % (sym2cpp(arrsize), nodedesc.dtype.ctype) + ctypedef = '%s *' % nodedesc.dtype.ctype deferred_allocation = any([s for s in nodedesc.shape if "__dace_defer" in str(s)]) - # Different types of GPU arrays - if nodedesc.storage == dtypes.StorageType.GPU_Global: + # Different types of GPU arrays) + if nodedesc.storage in dtypes.REALLOCATABLE_STORAGES: if not declared: declaration_stream.write('%s %s;\n' % (ctypedef, dataname)) self._dispatcher.defined_vars.add(dataname, DefinedType.Pointer, ctypedef) @@ -2808,27 +2810,43 @@ def reallocate( tmp_storage_name = "__tmp_realloc_move_storage" callsite_stream.write(f"if ({dst_node.data} == nullptr) {{", cfg, state_id, dst_node.guid) - self._alloc_gpu_global(dst_node, data, callsite_stream, data_name, new_size_str) + if data.storage == dtypes.StorageType.GPU_Global: + assert data.storage == dtypes.StorageType.CPU_Pinned + self._alloc_gpu_global(dst_node, data, callsite_stream, data_name, new_size_str) + else: + callsite_stream.write(f"DACE_GPU_CHECK({self.backend}MallocHost(reinterpret_cast(&{data_name}), {new_size_str}));", cfg, state_id, dst_node.guid) callsite_stream.write("} else {\n", cfg, state_id, dst_node.guid) callsite_stream.write(f"{dtype}* {tmp_storage_name};") - self._alloc_gpu_global(None, data, callsite_stream, tmp_storage_name, new_size_str) + if data.storage == dtypes.StorageType.GPU_Global: + self._alloc_gpu_global(None, data, callsite_stream, tmp_storage_name, new_size_str) + else: + assert data.storage == dtypes.StorageType.CPU_Pinned + callsite_stream.write(f"DACE_GPU_CHECK({self.backend}MallocHost(reinterpret_cast(&{tmp_storage_name}), {new_size_str}));", cfg, state_id, dst_node.guid) + s = "" - if not data.pool: # If pooled, will be freed somewhere else - copy_size_str = f"Min({old_size_str}, {new_size_str})" - s += f"DACE_GPU_CHECK({self.backend}Memcpy(static_cast({tmp_storage_name}), static_cast({data_name}), {copy_size_str}, cudaMemcpyDeviceToDevice));\n" - s += f"DACE_GPU_CHECK({self.backend}Free({data_name}));\n" + copy_size_str = f"Min({old_size_str}, {new_size_str})" + if data.storage == dtypes.StorageType.GPU_Global: + if not data.pool: # If pooled, will be freed somewhere else + s += f"DACE_GPU_CHECK({self.backend}Memcpy(static_cast({tmp_storage_name}), static_cast({data_name}), {copy_size_str}, cudaMemcpyDeviceToDevice));\n" + s += f"DACE_GPU_CHECK({self.backend}Free({data_name}));\n" + s += f"{data_name} = {tmp_storage_name};\n" + else: + cudastream = getattr(dst_node, '_cuda_stream', 'nullptr') + if cudastream != 'nullptr': + cudastream = f'__state->gpu_context->streams[{cudastream}]' + s += f'DACE_GPU_CHECK({self.backend}MallocAsync(reinterpret_cast(&{tmp_storage_name}), {new_size_str}, {cudastream}));\n' + s += f"DACE_GPU_CHECK({self.backend}MemcpyAsync(static_cast({tmp_storage_name}), static_cast({data_name}), {copy_size_str}, {cudastream}), cudaMemcpyDeviceToDevice));\n" + s += f"DACE_GPU_CHECK({self.backend}FreeAsync({data_name}, {cudastream}));\n" + callsite_stream.write(s) + self._emit_sync(callsite_stream) + callsite_stream.write(f"{data_name} = {tmp_storage_name};\n") + s = "" + elif data.storage == dtypes.StorageType.CPU_Pinned: + s += f"DACE_GPU_CHECK({self.backend}Memcpy(static_cast({tmp_storage_name}), static_cast({data_name}), {copy_size_str}, cudaMemcpyHostToHost));\n" + s += f"DACE_GPU_CHECK({self.backend}FreeHost({data_name}));\n" s += f"{data_name} = {tmp_storage_name};\n" else: - cudastream = getattr(dst_node, '_cuda_stream', 'nullptr') - if cudastream != 'nullptr': - cudastream = f'__state->gpu_context->streams[{cudastream}]' - s += f'DACE_GPU_CHECK({self.backend}MallocAsync(static_cast(&{data_name}), {new_size_str}, {cudastream}));\n' - s += f"DACE_GPU_CHECK({self.backend}MemcpyAsync(static_cast({tmp_storage_name}), static_cast({data_name}), {copy_size_str}, {cudastream}), cudaMemcpyDeviceToDevice));\n" - s += f"DACE_GPU_CHECK({self.backend}FreeAsync({data_name}, {cudastream}));\n" - callsite_stream.write(s) - self._emit_sync(callsite_stream) - callsite_stream.write(f"{data_name} = {tmp_storage_name};\n") - s = "" + raise Exception("Realloc in CUDA, storage type must be CPU_Pinned or GPU_Global") s += "}\n" callsite_stream.write(s) diff --git a/dace/sdfg/validation.py b/dace/sdfg/validation.py index cd37612185..c526d0e8a7 100644 --- a/dace/sdfg/validation.py +++ b/dace/sdfg/validation.py @@ -359,9 +359,9 @@ def validate_sdfg(sdfg: 'dace.sdfg.SDFG', references: Set[int] = None, **context f"Deferred arrays can't be returned. {desc} has __return in its name." , sdfg, None ) - if desc.storage is not dtypes.StorageType.GPU_Global and desc.storage is not dtypes.StorageType.CPU_Heap: + if desc.storage not in dtypes.REALLOCATABLE_STORAGES: raise InvalidSDFGError( - f"Deferred arrays are supported only for {dtypes.StorageType.GPU_Global} and {dtypes.StorageType.CPU_Heap} storage types for {desc}." + f"Deferred arrays are supported only for {dtypes.REALLOCATABLE_STORAGES} storage types for {desc}." , sdfg, None ) @@ -616,10 +616,10 @@ def validate_state(state: 'dace.sdfg.SDFGState', # Reading-Writing the size is valid only if the array is transient and has the storage type CPU_Heap or GPU_Global has_writes = len(write_size_edges) > 0 has_writes_or_reads = len(read_size_edges) + len(write_size_edges) > 0 - size_access_allowed = arr.transient and (arr.storage == dtypes.StorageType.CPU_Heap or arr.storage == dtypes.StorageType.GPU_Global) + size_access_allowed = arr.transient and (arr.storage in dtypes.REALLOCATABLE_STORAGES) if has_writes_or_reads and not size_access_allowed: raise InvalidSDFGNodeError('Reading the size of an array, or changing (writing to) the size of an array ' - 'is only valid if the array is transient and the storage is CPU_Heap or GPU_Global', sdfg, state_id, nid) + f'is only valid if the array is transient and the storage is in {dtypes.REALLOCATABLE_STORAGES}', sdfg, state_id, nid) if has_writes and scope[node] is not None: raise InvalidSDFGNodeError('Resizing array is not allowed within a scope (e.g. not inside maps)', sdfg, state_id, nid) diff --git a/tests/deferred_alloc_test.py b/tests/deferred_alloc_test.py index eee4482ae5..1d9df3a200 100644 --- a/tests/deferred_alloc_test.py +++ b/tests/deferred_alloc_test.py @@ -5,7 +5,7 @@ import pytest -@pytest.fixture(params=[dace.dtypes.StorageType.CPU_Heap, dace.dtypes.StorageType.GPU_Global]) +@pytest.fixture(params=[dace.dtypes.StorageType.CPU_Heap, dace.dtypes.StorageType.GPU_Global, dace.dtypes.StorageType.CPU_Pinned]) def storage_type(request): return request.param @@ -19,6 +19,8 @@ def schedule_type(storage_type): return dace.dtypes.ScheduleType.Sequential elif storage_type == dace.dtypes.StorageType.GPU_Global: return dace.dtypes.ScheduleType.GPU_Device + elif storage_type == dace.dtypes.StorageType.CPU_Pinned: + return dace.dtypes.ScheduleType.Sequential def _get_trivial_alloc_sdfg(storage_type: dace.dtypes.StorageType, transient: bool, write_size="0:2"): sdfg = dace.sdfg.SDFG(name=f"deferred_alloc_test_1") @@ -77,6 +79,8 @@ def _get_assign_map_sdfg(storage_type: dace.dtypes.StorageType, transient: bool, assert (schedule_type == dace.dtypes.ScheduleType.Sequential or schedule_type == dace.dtypes.ScheduleType.CPU_Multicore) elif storage_type == dace.dtypes.StorageType.GPU_Global: assert (schedule_type == dace.dtypes.ScheduleType.GPU_Device) + elif storage_type == dace.dtypes.StorageType.CPU_Pinned: + assert (schedule_type == dace.dtypes.ScheduleType.Sequential) an_3.add_out_connector('_read_size') map_entry2, map_exit2 = state.add_map(name="map2",ndrange={"i":dace.subsets.Range([(0,15-1,1)]),"j":dace.subsets.Range([(0,"__A_dim1_size-1", 1)])}, @@ -101,7 +105,7 @@ def _get_assign_map_sdfg(storage_type: dace.dtypes.StorageType, transient: bool, return sdfg def _valid_to_reallocate(transient, storage_type): - return transient and (storage_type == dace.dtypes.StorageType.GPU_Global or storage_type == dace.dtypes.StorageType.CPU_Heap) + return transient and (storage_type in dace.dtypes.REALLOCATABLE_STORAGES) def _test_trivial_realloc(storage_type: dace.dtypes.StorageType, transient: bool): sdfg = _get_trivial_alloc_sdfg(storage_type, transient) @@ -138,12 +142,12 @@ def _test_realloc_use(storage_type: dace.dtypes.StorageType, transient: bool, sc raise AssertionError("Realloc-use with non-transient data did not fail when it was expected to.") compiled_sdfg = sdfg.compile() - if storage_type == dace.dtypes.StorageType.CPU_Heap: + if storage_type == dace.dtypes.StorageType.CPU_Heap or storage_type == dace.dtypes.StorageType.CPU_Pinned: arr = numpy.array([-1.0]).astype(numpy.float32) user_size = numpy.array([10, 10]).astype(numpy.uint64) compiled_sdfg(user_size=user_size, example_array=arr) assert ( arr[0] == 3.0 ) - if storage_type == dace.dtypes.StorageType.GPU_Global: + elif storage_type == dace.dtypes.StorageType.GPU_Global: try: import cupy except Exception: @@ -158,12 +162,12 @@ def _test_realloc_use(storage_type: dace.dtypes.StorageType, transient: bool, sc sdfg.apply_transformations_repeated([StateFusion, RedundantArray, RedundantSecondArray]) sdfg.validate() compiled_sdfg = sdfg.compile() - if storage_type == dace.dtypes.StorageType.CPU_Heap: + if storage_type == dace.dtypes.StorageType.CPU_Heap or storage_type == dace.dtypes.StorageType.CPU_Pinned: arr = numpy.array([-1.0]).astype(numpy.float32) user_size = numpy.array([10, 10]).astype(numpy.uint64) compiled_sdfg(user_size=user_size, example_array=arr) assert ( arr[0] == 3.0 ) - if storage_type == dace.dtypes.StorageType.GPU_Global: + elif storage_type == dace.dtypes.StorageType.GPU_Global: try: import cupy except Exception: @@ -181,10 +185,18 @@ def test_realloc_use_gpu(transient: bool): def test_realloc_use_cpu(transient: bool): _test_realloc_use(dace.dtypes.StorageType.CPU_Heap, transient, dace.dtypes.ScheduleType.Sequential) +@pytest.mark.gpu +def test_realloc_use_cpu_pinned(transient: bool): + _test_realloc_use(dace.dtypes.StorageType.CPU_Pinned, transient, dace.dtypes.ScheduleType.Sequential) + @pytest.mark.gpu def test_trivial_realloc_gpu(transient: bool): _test_trivial_realloc(dace.dtypes.StorageType.GPU_Global, transient) +@pytest.mark.gpu +def test_trivial_realloc_cpu_pinned(transient: bool): + _test_trivial_realloc(dace.dtypes.StorageType.CPU_Pinned, transient) + def test_trivial_realloc_cpu(transient: bool): _test_trivial_realloc(dace.dtypes.StorageType.CPU_Heap, transient) @@ -220,6 +232,17 @@ def test_realloc_inside_map_gpu(): pytest.fail("Realloc-use with non-transient data and incomplete write did not fail when it was expected to.") +def test_realloc_inside_map_cpu_pinned(): + sdfg =_get_assign_map_sdfg(dace.dtypes.StorageType.CPU_Pinned, True, dace.dtypes.ScheduleType.Sequential) + _add_realloc_inside_map(sdfg, dace.dtypes.ScheduleType.Sequential) + try: + sdfg.validate() + except Exception: + return + + pytest.fail("Realloc-use with non-transient data and incomplete write did not fail when it was expected to.") + + def test_realloc_inside_map_cpu(): sdfg =_get_assign_map_sdfg(dace.dtypes.StorageType.CPU_Heap, True, dace.dtypes.ScheduleType.CPU_Multicore) _add_realloc_inside_map(sdfg, dace.dtypes.ScheduleType.CPU_Multicore) @@ -275,6 +298,8 @@ def _get_conditional_alloc_sdfg(storage_type: dace.dtypes.StorageType, transient assert (schedule_type == dace.dtypes.ScheduleType.Sequential or schedule_type == dace.dtypes.ScheduleType.CPU_Multicore) elif storage_type == dace.dtypes.StorageType.GPU_Global: assert (schedule_type == dace.dtypes.ScheduleType.GPU_Device) + elif storage_type == dace.dtypes.StorageType.CPU_Pinned: + assert (schedule_type == dace.dtypes.ScheduleType.Sequential) an_3 = state.add_access('A') an_3.add_out_connector('_read_size') @@ -321,6 +346,7 @@ def _get_conditional_alloc_sdfg(storage_type: dace.dtypes.StorageType, transient return sdfg +@pytest.mark.gpu def test_conditional_alloc_gpu(): sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.GPU_Global, True, dace.dtypes.ScheduleType.GPU_Device) sdfg.validate() @@ -335,6 +361,16 @@ def test_conditional_alloc_gpu(): sdfg(path=1, size1=size1, size2=size2, example_array=arr) assert ( arr.get()[0] == 3.0 ) +@pytest.mark.gpu +def test_conditional_alloc_cpu_pinned(): + sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.CPU_Pinned, True, dace.dtypes.ScheduleType.Sequential) + sdfg.validate() + size1 = numpy.array([1, 1]).astype(numpy.uint64) + size2 = numpy.array([22, 22]).astype(numpy.uint64) + arr = numpy.array([-1.0]).astype(numpy.float32) + sdfg(path=1, size1=size1, size2=size2, example_array=arr) + assert ( arr.get()[0] == 3.0 ) + def test_conditional_alloc_cpu(): sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.CPU_Heap, True, dace.dtypes.ScheduleType.CPU_Multicore) sdfg.validate() @@ -344,6 +380,7 @@ def test_conditional_alloc_cpu(): sdfg(path=0, size1=size1, size2=size2, example_array=arr) assert ( arr[0] == 3.0 ) +@pytest.mark.gpu def test_conditional_alloc_with_expr_gpu(): sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.GPU_Global, True, dace.dtypes.ScheduleType.GPU_Device, True) sdfg.validate() @@ -358,6 +395,16 @@ def test_conditional_alloc_with_expr_gpu(): sdfg(path=1, size1=size1, size2=size2, example_array=arr) assert ( arr.get()[0] == 3.0 ) +@pytest.mark.gpu +def test_conditional_alloc_with_expr_cpu_pinned(): + sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.CPU_Pinned, True, dace.dtypes.ScheduleType.Sequential, True) + sdfg.validate() + size1 = numpy.array([1, 1]).astype(numpy.uint64) + size2 = numpy.array([22, 22]).astype(numpy.uint64) + arr = numpy.array([-1.0]).astype(numpy.float32) + sdfg(path=1, size1=size1, size2=size2, example_array=arr) + assert ( arr.get()[0] == 3.0 ) + def test_conditional_alloc_with_expr_cpu(): sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.CPU_Heap, True, dace.dtypes.ScheduleType.CPU_Multicore, True) sdfg.validate() @@ -391,24 +438,36 @@ def test_incomplete_write_dimensions_2(): test_realloc_inside_map_cpu() print(f"Trivial Realloc within map, gpu") test_realloc_inside_map_gpu() + print(f"Trivial Realloc within map, cpu pinned") + test_realloc_inside_map_cpu_pinned() print(f"Trivial Realloc with storage, cpu") test_trivial_realloc_cpu(True) print(f"Trivial Realloc-Use with storage, cpu") test_realloc_use_cpu(True) + print(f"Trivial Realloc within map, cpu pinned") + test_realloc_use_cpu_pinned(True) print(f"Trivial Realloc with storage, gpu") test_trivial_realloc_gpu(True) print(f"Trivial Realloc-Use with storage, gpu") test_realloc_use_gpu(True) + print(f"Trivial Realloc-Use with storage, cpu pinned") + test_realloc_use_cpu_pinned(True) + print(f"Trivial Realloc with storage, cpu, on non-transient data") test_trivial_realloc_cpu(False) print(f"Trivial Realloc-Use with storage, cpu, on non-transient data") test_realloc_use_cpu(False) print(f"Trivial Realloc with storage, gpu, on non-transient data") test_trivial_realloc_gpu(False) + print(f"Trivial Realloc-Use with storage, gpu, on non-transient data") test_realloc_use_gpu(False) + print(f"Trivial Realloc with storage, cpu pinned, on non-transient data") + test_trivial_realloc_cpu_pinned(False) + print(f"Trivial Realloc-Use with storage, cpu pinned, on non-transient data") + test_realloc_use_cpu_pinned(False) print(f"Realloc with incomplete write one, validation") test_incomplete_write_dimensions_1() @@ -419,8 +478,12 @@ def test_incomplete_write_dimensions_2(): test_conditional_alloc_cpu() print(f"Test conditional alloc with use, gpu") test_conditional_alloc_gpu() + print(f"Test conditional alloc with use, cpu pinned") + test_conditional_alloc_cpu_pinned() print(f"Test conditional alloc with use and the shape as a non-trivial expression, cpu") test_conditional_alloc_with_expr_cpu() print(f"Test conditional alloc with use and the shape as a non-trivial expression, gpu") - test_conditional_alloc_with_expr_gpu() \ No newline at end of file + test_conditional_alloc_with_expr_gpu() + print(f"Test conditional alloc with use and the shape as a non-trivial expression, cpu pinned") + test_conditional_alloc_with_expr_cpu_pinned() \ No newline at end of file