Skip to content

Commit

Permalink
Refactor and support CPU_Pinned
Browse files Browse the repository at this point in the history
  • Loading branch information
ThrudPrimrose committed Dec 13, 2024
1 parent 80f6b4a commit ae08459
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 32 deletions.
5 changes: 2 additions & 3 deletions dace/codegen/targets/cpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,7 @@ def replace_pattern(match):


def _get_deferred_size_names(desc, name):
if (desc.storage != dtypes.StorageType.GPU_Global and
desc.storage != dtypes.StorageType.CPU_Heap and
if (desc.storage not in dtypes.REALLOCATABLE_STORAGES and
not desc.transient):
return None
def check_dace_defer(elements):
Expand All @@ -603,7 +602,7 @@ def check_dace_defer(elements):
return False
deferred_size_names = None
if check_dace_defer(desc.shape):
if desc.storage == dtypes.StorageType.GPU_Global or desc.storage == dtypes.StorageType.CPU_Heap:
if desc.storage in dtypes.REALLOCATABLE_STORAGES:
deferred_size_names = []
for i, elem in enumerate(desc.shape):
if "__dace_defer" in str(elem):
Expand Down
54 changes: 36 additions & 18 deletions dace/codegen/targets/cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def __init__(self, frame_codegen: 'DaCeCodeGenerator', sdfg: SDFG):
dispatcher.register_copy_dispatcher(dtypes.StorageType.Register, st, sched_type, illegal_copy)

dispatcher.register_reallocate_dispatcher(dtypes.StorageType.GPU_Global, self)
dispatcher.register_reallocate_dispatcher(dtypes.StorageType.CPU_Pinned, self)
# End of illegal copies
# End of dispatcher registration
######################################
Expand Down Expand Up @@ -606,11 +607,12 @@ def allocate_array(self, sdfg: SDFG, cfg: ControlFlowRegion, dfg: StateSubgraphV
arrsize = nodedesc.total_size
is_dynamically_sized = symbolic.issymbolic(arrsize, sdfg.constants)
arrsize_malloc = '%s * sizeof(%s)' % (sym2cpp(arrsize), nodedesc.dtype.ctype)

ctypedef = '%s *' % nodedesc.dtype.ctype
deferred_allocation = any([s for s in nodedesc.shape if "__dace_defer" in str(s)])

# Different types of GPU arrays
if nodedesc.storage == dtypes.StorageType.GPU_Global:
# Different types of GPU arrays)
if nodedesc.storage in dtypes.REALLOCATABLE_STORAGES:
if not declared:
declaration_stream.write('%s %s;\n' % (ctypedef, dataname))
self._dispatcher.defined_vars.add(dataname, DefinedType.Pointer, ctypedef)
Expand Down Expand Up @@ -2808,27 +2810,43 @@ def reallocate(
tmp_storage_name = "__tmp_realloc_move_storage"

callsite_stream.write(f"if ({dst_node.data} == nullptr) {{", cfg, state_id, dst_node.guid)
self._alloc_gpu_global(dst_node, data, callsite_stream, data_name, new_size_str)
if data.storage == dtypes.StorageType.GPU_Global:
assert data.storage == dtypes.StorageType.CPU_Pinned
self._alloc_gpu_global(dst_node, data, callsite_stream, data_name, new_size_str)
else:
callsite_stream.write(f"DACE_GPU_CHECK({self.backend}MallocHost(reinterpret_cast<void**>(&{data_name}), {new_size_str}));", cfg, state_id, dst_node.guid)
callsite_stream.write("} else {\n", cfg, state_id, dst_node.guid)
callsite_stream.write(f"{dtype}* {tmp_storage_name};")
self._alloc_gpu_global(None, data, callsite_stream, tmp_storage_name, new_size_str)
if data.storage == dtypes.StorageType.GPU_Global:
self._alloc_gpu_global(None, data, callsite_stream, tmp_storage_name, new_size_str)
else:
assert data.storage == dtypes.StorageType.CPU_Pinned
callsite_stream.write(f"DACE_GPU_CHECK({self.backend}MallocHost(reinterpret_cast<void**>(&{tmp_storage_name}), {new_size_str}));", cfg, state_id, dst_node.guid)

s = ""
if not data.pool: # If pooled, will be freed somewhere else
copy_size_str = f"Min({old_size_str}, {new_size_str})"
s += f"DACE_GPU_CHECK({self.backend}Memcpy(static_cast<void *>({tmp_storage_name}), static_cast<void *>({data_name}), {copy_size_str}, cudaMemcpyDeviceToDevice));\n"
s += f"DACE_GPU_CHECK({self.backend}Free({data_name}));\n"
copy_size_str = f"Min({old_size_str}, {new_size_str})"
if data.storage == dtypes.StorageType.GPU_Global:
if not data.pool: # If pooled, will be freed somewhere else
s += f"DACE_GPU_CHECK({self.backend}Memcpy(static_cast<void *>({tmp_storage_name}), static_cast<void *>({data_name}), {copy_size_str}, cudaMemcpyDeviceToDevice));\n"
s += f"DACE_GPU_CHECK({self.backend}Free({data_name}));\n"
s += f"{data_name} = {tmp_storage_name};\n"
else:
cudastream = getattr(dst_node, '_cuda_stream', 'nullptr')
if cudastream != 'nullptr':
cudastream = f'__state->gpu_context->streams[{cudastream}]'
s += f'DACE_GPU_CHECK({self.backend}MallocAsync(reinterpret_cast<void**>(&{tmp_storage_name}), {new_size_str}, {cudastream}));\n'
s += f"DACE_GPU_CHECK({self.backend}MemcpyAsync(static_cast<void *>({tmp_storage_name}), static_cast<void *>({data_name}), {copy_size_str}, {cudastream}), cudaMemcpyDeviceToDevice));\n"
s += f"DACE_GPU_CHECK({self.backend}FreeAsync({data_name}, {cudastream}));\n"
callsite_stream.write(s)
self._emit_sync(callsite_stream)
callsite_stream.write(f"{data_name} = {tmp_storage_name};\n")
s = ""
elif data.storage == dtypes.StorageType.CPU_Pinned:
s += f"DACE_GPU_CHECK({self.backend}Memcpy(static_cast<void *>({tmp_storage_name}), static_cast<void *>({data_name}), {copy_size_str}, cudaMemcpyHostToHost));\n"
s += f"DACE_GPU_CHECK({self.backend}FreeHost({data_name}));\n"
s += f"{data_name} = {tmp_storage_name};\n"
else:
cudastream = getattr(dst_node, '_cuda_stream', 'nullptr')
if cudastream != 'nullptr':
cudastream = f'__state->gpu_context->streams[{cudastream}]'
s += f'DACE_GPU_CHECK({self.backend}MallocAsync(static_cast<void**>(&{data_name}), {new_size_str}, {cudastream}));\n'
s += f"DACE_GPU_CHECK({self.backend}MemcpyAsync(static_cast<void *>({tmp_storage_name}), static_cast<void *>({data_name}), {copy_size_str}, {cudastream}), cudaMemcpyDeviceToDevice));\n"
s += f"DACE_GPU_CHECK({self.backend}FreeAsync({data_name}, {cudastream}));\n"
callsite_stream.write(s)
self._emit_sync(callsite_stream)
callsite_stream.write(f"{data_name} = {tmp_storage_name};\n")
s = ""
raise Exception("Realloc in CUDA, storage type must be CPU_Pinned or GPU_Global")
s += "}\n"
callsite_stream.write(s)

Expand Down
8 changes: 4 additions & 4 deletions dace/sdfg/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@ def validate_sdfg(sdfg: 'dace.sdfg.SDFG', references: Set[int] = None, **context
f"Deferred arrays can't be returned. {desc} has __return in its name."
, sdfg, None
)
if desc.storage is not dtypes.StorageType.GPU_Global and desc.storage is not dtypes.StorageType.CPU_Heap:
if desc.storage not in dtypes.REALLOCATABLE_STORAGES:
raise InvalidSDFGError(
f"Deferred arrays are supported only for {dtypes.StorageType.GPU_Global} and {dtypes.StorageType.CPU_Heap} storage types for {desc}."
f"Deferred arrays are supported only for {dtypes.REALLOCATABLE_STORAGES} storage types for {desc}."
, sdfg, None
)

Expand Down Expand Up @@ -616,10 +616,10 @@ def validate_state(state: 'dace.sdfg.SDFGState',
# Reading-Writing the size is valid only if the array is transient and has the storage type CPU_Heap or GPU_Global
has_writes = len(write_size_edges) > 0
has_writes_or_reads = len(read_size_edges) + len(write_size_edges) > 0
size_access_allowed = arr.transient and (arr.storage == dtypes.StorageType.CPU_Heap or arr.storage == dtypes.StorageType.GPU_Global)
size_access_allowed = arr.transient and (arr.storage in dtypes.REALLOCATABLE_STORAGES)
if has_writes_or_reads and not size_access_allowed:
raise InvalidSDFGNodeError('Reading the size of an array, or changing (writing to) the size of an array '
'is only valid if the array is transient and the storage is CPU_Heap or GPU_Global', sdfg, state_id, nid)
f'is only valid if the array is transient and the storage is in {dtypes.REALLOCATABLE_STORAGES}', sdfg, state_id, nid)
if has_writes and scope[node] is not None:
raise InvalidSDFGNodeError('Resizing array is not allowed within a scope (e.g. not inside maps)', sdfg, state_id, nid)

Expand Down
77 changes: 70 additions & 7 deletions tests/deferred_alloc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest


@pytest.fixture(params=[dace.dtypes.StorageType.CPU_Heap, dace.dtypes.StorageType.GPU_Global])
@pytest.fixture(params=[dace.dtypes.StorageType.CPU_Heap, dace.dtypes.StorageType.GPU_Global, dace.dtypes.StorageType.CPU_Pinned])
def storage_type(request):
return request.param

Expand All @@ -19,6 +19,8 @@ def schedule_type(storage_type):
return dace.dtypes.ScheduleType.Sequential
elif storage_type == dace.dtypes.StorageType.GPU_Global:
return dace.dtypes.ScheduleType.GPU_Device
elif storage_type == dace.dtypes.StorageType.CPU_Pinned:
return dace.dtypes.ScheduleType.Sequential

def _get_trivial_alloc_sdfg(storage_type: dace.dtypes.StorageType, transient: bool, write_size="0:2"):
sdfg = dace.sdfg.SDFG(name=f"deferred_alloc_test_1")
Expand Down Expand Up @@ -77,6 +79,8 @@ def _get_assign_map_sdfg(storage_type: dace.dtypes.StorageType, transient: bool,
assert (schedule_type == dace.dtypes.ScheduleType.Sequential or schedule_type == dace.dtypes.ScheduleType.CPU_Multicore)
elif storage_type == dace.dtypes.StorageType.GPU_Global:
assert (schedule_type == dace.dtypes.ScheduleType.GPU_Device)
elif storage_type == dace.dtypes.StorageType.CPU_Pinned:
assert (schedule_type == dace.dtypes.ScheduleType.Sequential)

an_3.add_out_connector('_read_size')
map_entry2, map_exit2 = state.add_map(name="map2",ndrange={"i":dace.subsets.Range([(0,15-1,1)]),"j":dace.subsets.Range([(0,"__A_dim1_size-1", 1)])},
Expand All @@ -101,7 +105,7 @@ def _get_assign_map_sdfg(storage_type: dace.dtypes.StorageType, transient: bool,
return sdfg

def _valid_to_reallocate(transient, storage_type):
return transient and (storage_type == dace.dtypes.StorageType.GPU_Global or storage_type == dace.dtypes.StorageType.CPU_Heap)
return transient and (storage_type in dace.dtypes.REALLOCATABLE_STORAGES)

def _test_trivial_realloc(storage_type: dace.dtypes.StorageType, transient: bool):
sdfg = _get_trivial_alloc_sdfg(storage_type, transient)
Expand Down Expand Up @@ -138,12 +142,12 @@ def _test_realloc_use(storage_type: dace.dtypes.StorageType, transient: bool, sc
raise AssertionError("Realloc-use with non-transient data did not fail when it was expected to.")

compiled_sdfg = sdfg.compile()
if storage_type == dace.dtypes.StorageType.CPU_Heap:
if storage_type == dace.dtypes.StorageType.CPU_Heap or storage_type == dace.dtypes.StorageType.CPU_Pinned:
arr = numpy.array([-1.0]).astype(numpy.float32)
user_size = numpy.array([10, 10]).astype(numpy.uint64)
compiled_sdfg(user_size=user_size, example_array=arr)
assert ( arr[0] == 3.0 )
if storage_type == dace.dtypes.StorageType.GPU_Global:
elif storage_type == dace.dtypes.StorageType.GPU_Global:
try:
import cupy
except Exception:
Expand All @@ -158,12 +162,12 @@ def _test_realloc_use(storage_type: dace.dtypes.StorageType, transient: bool, sc
sdfg.apply_transformations_repeated([StateFusion, RedundantArray, RedundantSecondArray])
sdfg.validate()
compiled_sdfg = sdfg.compile()
if storage_type == dace.dtypes.StorageType.CPU_Heap:
if storage_type == dace.dtypes.StorageType.CPU_Heap or storage_type == dace.dtypes.StorageType.CPU_Pinned:
arr = numpy.array([-1.0]).astype(numpy.float32)
user_size = numpy.array([10, 10]).astype(numpy.uint64)
compiled_sdfg(user_size=user_size, example_array=arr)
assert ( arr[0] == 3.0 )
if storage_type == dace.dtypes.StorageType.GPU_Global:
elif storage_type == dace.dtypes.StorageType.GPU_Global:
try:
import cupy
except Exception:
Expand All @@ -181,10 +185,18 @@ def test_realloc_use_gpu(transient: bool):
def test_realloc_use_cpu(transient: bool):
_test_realloc_use(dace.dtypes.StorageType.CPU_Heap, transient, dace.dtypes.ScheduleType.Sequential)

@pytest.mark.gpu
def test_realloc_use_cpu_pinned(transient: bool):
_test_realloc_use(dace.dtypes.StorageType.CPU_Pinned, transient, dace.dtypes.ScheduleType.Sequential)

@pytest.mark.gpu
def test_trivial_realloc_gpu(transient: bool):
_test_trivial_realloc(dace.dtypes.StorageType.GPU_Global, transient)

@pytest.mark.gpu
def test_trivial_realloc_cpu_pinned(transient: bool):
_test_trivial_realloc(dace.dtypes.StorageType.CPU_Pinned, transient)

def test_trivial_realloc_cpu(transient: bool):
_test_trivial_realloc(dace.dtypes.StorageType.CPU_Heap, transient)

Expand Down Expand Up @@ -220,6 +232,17 @@ def test_realloc_inside_map_gpu():

pytest.fail("Realloc-use with non-transient data and incomplete write did not fail when it was expected to.")

def test_realloc_inside_map_cpu_pinned():
sdfg =_get_assign_map_sdfg(dace.dtypes.StorageType.CPU_Pinned, True, dace.dtypes.ScheduleType.Sequential)
_add_realloc_inside_map(sdfg, dace.dtypes.ScheduleType.Sequential)
try:
sdfg.validate()
except Exception:
return

pytest.fail("Realloc-use with non-transient data and incomplete write did not fail when it was expected to.")


def test_realloc_inside_map_cpu():
sdfg =_get_assign_map_sdfg(dace.dtypes.StorageType.CPU_Heap, True, dace.dtypes.ScheduleType.CPU_Multicore)
_add_realloc_inside_map(sdfg, dace.dtypes.ScheduleType.CPU_Multicore)
Expand Down Expand Up @@ -275,6 +298,8 @@ def _get_conditional_alloc_sdfg(storage_type: dace.dtypes.StorageType, transient
assert (schedule_type == dace.dtypes.ScheduleType.Sequential or schedule_type == dace.dtypes.ScheduleType.CPU_Multicore)
elif storage_type == dace.dtypes.StorageType.GPU_Global:
assert (schedule_type == dace.dtypes.ScheduleType.GPU_Device)
elif storage_type == dace.dtypes.StorageType.CPU_Pinned:
assert (schedule_type == dace.dtypes.ScheduleType.Sequential)

an_3 = state.add_access('A')
an_3.add_out_connector('_read_size')
Expand Down Expand Up @@ -321,6 +346,7 @@ def _get_conditional_alloc_sdfg(storage_type: dace.dtypes.StorageType, transient

return sdfg

@pytest.mark.gpu
def test_conditional_alloc_gpu():
sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.GPU_Global, True, dace.dtypes.ScheduleType.GPU_Device)
sdfg.validate()
Expand All @@ -335,6 +361,16 @@ def test_conditional_alloc_gpu():
sdfg(path=1, size1=size1, size2=size2, example_array=arr)
assert ( arr.get()[0] == 3.0 )

@pytest.mark.gpu
def test_conditional_alloc_cpu_pinned():
sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.CPU_Pinned, True, dace.dtypes.ScheduleType.Sequential)
sdfg.validate()
size1 = numpy.array([1, 1]).astype(numpy.uint64)
size2 = numpy.array([22, 22]).astype(numpy.uint64)
arr = numpy.array([-1.0]).astype(numpy.float32)
sdfg(path=1, size1=size1, size2=size2, example_array=arr)
assert ( arr.get()[0] == 3.0 )

def test_conditional_alloc_cpu():
sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.CPU_Heap, True, dace.dtypes.ScheduleType.CPU_Multicore)
sdfg.validate()
Expand All @@ -344,6 +380,7 @@ def test_conditional_alloc_cpu():
sdfg(path=0, size1=size1, size2=size2, example_array=arr)
assert ( arr[0] == 3.0 )

@pytest.mark.gpu
def test_conditional_alloc_with_expr_gpu():
sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.GPU_Global, True, dace.dtypes.ScheduleType.GPU_Device, True)
sdfg.validate()
Expand All @@ -358,6 +395,16 @@ def test_conditional_alloc_with_expr_gpu():
sdfg(path=1, size1=size1, size2=size2, example_array=arr)
assert ( arr.get()[0] == 3.0 )

@pytest.mark.gpu
def test_conditional_alloc_with_expr_cpu_pinned():
sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.CPU_Pinned, True, dace.dtypes.ScheduleType.Sequential, True)
sdfg.validate()
size1 = numpy.array([1, 1]).astype(numpy.uint64)
size2 = numpy.array([22, 22]).astype(numpy.uint64)
arr = numpy.array([-1.0]).astype(numpy.float32)
sdfg(path=1, size1=size1, size2=size2, example_array=arr)
assert ( arr.get()[0] == 3.0 )

def test_conditional_alloc_with_expr_cpu():
sdfg =_get_conditional_alloc_sdfg(dace.dtypes.StorageType.CPU_Heap, True, dace.dtypes.ScheduleType.CPU_Multicore, True)
sdfg.validate()
Expand Down Expand Up @@ -391,24 +438,36 @@ def test_incomplete_write_dimensions_2():
test_realloc_inside_map_cpu()
print(f"Trivial Realloc within map, gpu")
test_realloc_inside_map_gpu()
print(f"Trivial Realloc within map, cpu pinned")
test_realloc_inside_map_cpu_pinned()

print(f"Trivial Realloc with storage, cpu")
test_trivial_realloc_cpu(True)
print(f"Trivial Realloc-Use with storage, cpu")
test_realloc_use_cpu(True)
print(f"Trivial Realloc within map, cpu pinned")
test_realloc_use_cpu_pinned(True)

print(f"Trivial Realloc with storage, gpu")
test_trivial_realloc_gpu(True)
print(f"Trivial Realloc-Use with storage, gpu")
test_realloc_use_gpu(True)
print(f"Trivial Realloc-Use with storage, cpu pinned")
test_realloc_use_cpu_pinned(True)

print(f"Trivial Realloc with storage, cpu, on non-transient data")
test_trivial_realloc_cpu(False)
print(f"Trivial Realloc-Use with storage, cpu, on non-transient data")
test_realloc_use_cpu(False)
print(f"Trivial Realloc with storage, gpu, on non-transient data")
test_trivial_realloc_gpu(False)

print(f"Trivial Realloc-Use with storage, gpu, on non-transient data")
test_realloc_use_gpu(False)
print(f"Trivial Realloc with storage, cpu pinned, on non-transient data")
test_trivial_realloc_cpu_pinned(False)
print(f"Trivial Realloc-Use with storage, cpu pinned, on non-transient data")
test_realloc_use_cpu_pinned(False)

print(f"Realloc with incomplete write one, validation")
test_incomplete_write_dimensions_1()
Expand All @@ -419,8 +478,12 @@ def test_incomplete_write_dimensions_2():
test_conditional_alloc_cpu()
print(f"Test conditional alloc with use, gpu")
test_conditional_alloc_gpu()
print(f"Test conditional alloc with use, cpu pinned")
test_conditional_alloc_cpu_pinned()

print(f"Test conditional alloc with use and the shape as a non-trivial expression, cpu")
test_conditional_alloc_with_expr_cpu()
print(f"Test conditional alloc with use and the shape as a non-trivial expression, gpu")
test_conditional_alloc_with_expr_gpu()
test_conditional_alloc_with_expr_gpu()
print(f"Test conditional alloc with use and the shape as a non-trivial expression, cpu pinned")
test_conditional_alloc_with_expr_cpu_pinned()

0 comments on commit ae08459

Please sign in to comment.