From fa305d2f4305d28144090d7a317c546157b1f343 Mon Sep 17 00:00:00 2001 From: Tal Ben-Nun Date: Fri, 5 Jan 2024 15:59:14 -0800 Subject: [PATCH] CuPy fixes and special cases for HIP (#1492) CuPy with AMD HIP does not support `__cuda_array_interface__`. This PR adds special cases to support CuPy with HIP. The PR also fixes the reduce node's GPUAuto schedule with respect to warp size. --- dace/codegen/compiled_sdfg.py | 10 ++++++- dace/data.py | 10 +++++-- dace/dtypes.py | 7 +++++ dace/libraries/standard/nodes/reduce.py | 31 +++++++++++---------- dace/libraries/standard/nodes/ttranspose.py | 5 +++- dace/runtime/include/dace/reduction.h | 20 +++++++++++++ 6 files changed, 65 insertions(+), 18 deletions(-) diff --git a/dace/codegen/compiled_sdfg.py b/dace/codegen/compiled_sdfg.py index fdf68214ed..ff7bc6084e 100644 --- a/dace/codegen/compiled_sdfg.py +++ b/dace/codegen/compiled_sdfg.py @@ -159,8 +159,16 @@ def _array_interface_ptr(array: Any, storage: dtypes.StorageType) -> int: """ if hasattr(array, 'data_ptr'): return array.data_ptr() + if storage == dtypes.StorageType.GPU_Global: - return array.__cuda_array_interface__['data'][0] + try: + return array.__cuda_array_interface__['data'][0] + except AttributeError: + # Special case for CuPy with HIP + if hasattr(array, 'data') and hasattr(array.data, 'ptr'): + return array.data.ptr + raise + return array.__array_interface__['data'][0] diff --git a/dace/data.py b/dace/data.py index 199e7dabd4..cceaa4139c 100644 --- a/dace/data.py +++ b/dace/data.py @@ -73,9 +73,15 @@ def create_datadescriptor(obj, no_custom_desc=False): else: dtype = dtypes.typeclass(obj.dtype.type) return Array(dtype=dtype, strides=tuple(s // obj.itemsize for s in obj.strides), shape=obj.shape) - # special case for torch tensors. Maybe __array__ could be used here for a more - # general solution, but torch doesn't support __array__ for cuda tensors. + elif type(obj).__module__ == "cupy" and type(obj).__name__ == "ndarray": + # special case for CuPy and HIP, which does not support __cuda_array_interface__ + storage = dtypes.StorageType.GPU_Global + dtype = dtypes.typeclass(obj.dtype.type) + itemsize = obj.itemsize + return Array(dtype=dtype, shape=obj.shape, strides=tuple(s // itemsize for s in obj.strides), storage=storage) elif type(obj).__module__ == "torch" and type(obj).__name__ == "Tensor": + # special case for torch tensors. Maybe __array__ could be used here for a more + # general solution, but torch doesn't support __array__ for cuda tensors. try: # If torch is importable, define translations between typeclasses and torch types. These are reused by daceml. # conversion happens here in pytorch: diff --git a/dace/dtypes.py b/dace/dtypes.py index f3ccddbfb7..a890668595 100644 --- a/dace/dtypes.py +++ b/dace/dtypes.py @@ -1557,6 +1557,8 @@ def is_array(obj: Any) -> bool: return hasattr(obj, 'shape') and len(obj.shape) > 0 except TypeError: # PyTorch scalar objects define an attribute called shape that cannot be used return False + if hasattr(obj, 'data') and hasattr(obj.data, 'ptr'): # CuPy special case with HIP + return True return False @@ -1577,4 +1579,9 @@ def is_gpu_array(obj: Any) -> bool: # In PyTorch, accessing this attribute throws a runtime error for # variables that require grad, or KeyError when a boolean array is used return False + + if hasattr(obj, 'data') and hasattr(obj.data, 'ptr'): # CuPy special case with HIP + if hasattr(obj, 'device') and getattr(obj.device, 'id', -1) >= 0: + return True + return False diff --git a/dace/libraries/standard/nodes/reduce.py b/dace/libraries/standard/nodes/reduce.py index dd026ea62c..4e04a656fe 100644 --- a/dace/libraries/standard/nodes/reduce.py +++ b/dace/libraries/standard/nodes/reduce.py @@ -1088,7 +1088,7 @@ class ExpandReduceGPUAuto(pm.ExpandTransformation): """ GPU implementation of the reduce node. This expansion aims to map the reduction inputs to an optimal GPU schedule. """ - environments = [CUDA] + environments = [] @staticmethod def expansion(node: 'Reduce', state: SDFGState, sdfg: SDFG): @@ -1099,6 +1099,8 @@ def expansion(node: 'Reduce', state: SDFGState, sdfg: SDFG): :param state: the state in which the node is in :param sdfg: the SDFG in which the node is in """ + from dace.codegen import common + node.validate(sdfg, state) inedge: graph.MultiConnectorEdge = state.in_edges(node)[0] outedge: graph.MultiConnectorEdge = state.out_edges(node)[0] @@ -1106,6 +1108,7 @@ def expansion(node: 'Reduce', state: SDFGState, sdfg: SDFG): isqdim = insubset.squeeze() raw_input_data = sdfg.arrays[inedge.data.data] raw_output_data = sdfg.arrays[outedge.data.data] + warp_size = 64 if common.get_gpu_backend() == 'hip' else 32 in_type = raw_input_data.dtype @@ -1132,7 +1135,7 @@ def expansion(node: 'Reduce', state: SDFGState, sdfg: SDFG): axes = [axis for axis in axes if axis in isqdim] # call the planner script - schedule = red_planner.get_reduction_schedule(raw_input_data, axes) + schedule = red_planner.get_reduction_schedule(raw_input_data, axes, warp_size=warp_size) if schedule.error: # return pure expansion if error @@ -1340,25 +1343,25 @@ def expansion(node: 'Reduce', state: SDFGState, sdfg: SDFG): real_state = nested_sdfg.add_state('real_state') nested_sdfg.add_edge(start_state, real_state, - dace.InterstateEdge(f'_b1 + 32 * _g < {schedule.in_shape[-1]}')) + dace.InterstateEdge(f'_b1 + {warp_size} * _g < {schedule.in_shape[-1]}')) reset_outm = dace.Memlet(f'_out[{",".join(["_o%d" % i for i in range(len(schedule.out_shape))])}]') if len(schedule.out_shape) > 1: outm = dace.Memlet( - f'_out[{",".join(["_o%d" % i for i in range(len(schedule.out_shape) - 1)])},_g * 32 + _b]', + f'_out[{",".join(["_o%d" % i for i in range(len(schedule.out_shape) - 1)])},_g * {warp_size} + _b]', dynamic=True) outm_wcr = dace.Memlet( - f'_out[{",".join(["_o%d" % i for i in range(len(schedule.out_shape) - 1)])},_g * 32 + _b]', + f'_out[{",".join(["_o%d" % i for i in range(len(schedule.out_shape) - 1)])},_g * {warp_size} + _b]', dynamic=True, wcr=node.wcr) else: - outm = dace.Memlet(f'_out[_g * 32 + _b]', dynamic=True) - outm_wcr = dace.Memlet(f'_out[_g * 32 + _b]', dynamic=True, wcr=node.wcr) + outm = dace.Memlet(f'_out[_g * {warp_size} + _b]', dynamic=True) + outm_wcr = dace.Memlet(f'_out[_g * {warp_size} + _b]', dynamic=True, wcr=node.wcr) input_subset = input_subset[:-2] input_subset.append(f'0:{schedule.sequential[0]}') - input_subset.append('_g * 32 + _b1') + input_subset.append(f'_g * {warp_size} + _b1') inmm = dace.Memlet(f'_in[{",".join(input_subset)}]', dynamic=True) if schedule.multi_axes: @@ -1401,13 +1404,13 @@ def expansion(node: 'Reduce', state: SDFGState, sdfg: SDFG): schedule=dtypes.ScheduleType.GPU_ThreadBlock) else: - bme1, bmx1 = nstate.add_map('block', {'_b': f'0:32'}, schedule=dtypes.ScheduleType.GPU_ThreadBlock) + bme1, bmx1 = nstate.add_map('block', {'_b': f'0:{warp_size}'}, schedule=dtypes.ScheduleType.GPU_ThreadBlock) bme2, bmx2 = nstate.add_map('block', {f'_b{i}': f'0:{sz}' for i, sz in enumerate(schedule.block)}, schedule=dtypes.ScheduleType.GPU_ThreadBlock) - # add shared memory of size 32 to outer sdfg + # add shared memory of warp size to outer sdfg nsdfg.add_array('s_mem', [schedule.shared_mem_size], nsdfg.arrays['_in'].dtype, dtypes.StorageType.GPU_Shared, @@ -1482,11 +1485,11 @@ def expansion(node: 'Reduce', state: SDFGState, sdfg: SDFG): if mini_warps: cond_tasklet = nstate.add_tasklet( 'cond_write', {'_input'}, {'_output'}, - f'if _b + 32 * _g < {schedule.out_shape[-1]} and _bb == 0 and _mwid == 0: _output = _input') + f'if _b + {warp_size} * _g < {schedule.out_shape[-1]} and _bb == 0 and _mwid == 0: _output = _input') else: cond_tasklet = nstate.add_tasklet( 'cond_write', {'_input'}, {'_output'}, - f'if _b + 32 * _g < {schedule.out_shape[-1]} and _bb == 0: _output = _input') + f'if _b + {warp_size} * _g < {schedule.out_shape[-1]} and _bb == 0: _output = _input') # connect accumulator to identity tasklet real_state.add_memlet_path(accread, ime, id, dst_conn='a', memlet=dace.Memlet('acc[0]')) @@ -1511,8 +1514,8 @@ def expansion(node: 'Reduce', state: SDFGState, sdfg: SDFG): nstate.add_memlet_path(s_mem3, bme3, cond_tasklet, dst_conn='_input', memlet=dace.Memlet('s_mem[_b]')) else: bme3, bmx3 = nstate.add_map('block', { - '_bb': '0:16', - '_b': f'0:32' + '_bb': f'0:{512//warp_size}', + '_b': f'0:{warp_size}' }, schedule=dtypes.ScheduleType.GPU_ThreadBlock) nstate.add_memlet_path(s_mem3, bme3, cond_tasklet, dst_conn='_input', memlet=dace.Memlet('s_mem[_b]')) diff --git a/dace/libraries/standard/nodes/ttranspose.py b/dace/libraries/standard/nodes/ttranspose.py index e11012e3ad..6d142db81f 100644 --- a/dace/libraries/standard/nodes/ttranspose.py +++ b/dace/libraries/standard/nodes/ttranspose.py @@ -38,7 +38,10 @@ def expansion(node, parent_state, parent_sdfg): out_mem = dace.Memlet(expr=f"_out_tensor[{','.join([map_params[i] for i in node.axes])}]") inputs = {"_inp": inp_mem} outputs = {"_out": out_mem} - code = f"_out = {node.alpha} * _inp" + if node.alpha == 1: + code = "_out = _inp" + else: + code = f"_out = decltype(_inp)({node.alpha}) * _inp" if node.beta != 0: inputs["_inout"] = out_mem code = f"_out = {node.alpha} * _inp + {node.beta} * _inout" diff --git a/dace/runtime/include/dace/reduction.h b/dace/runtime/include/dace/reduction.h index 9d8c59997c..927bf449de 100644 --- a/dace/runtime/include/dace/reduction.h +++ b/dace/runtime/include/dace/reduction.h @@ -592,7 +592,9 @@ namespace dace { cub::TransformInputIterator itr(counting_iterator, conversion_op); return itr; } +#endif +#if defined(__CUDACC__) template struct warpReduce { static DACE_DFI T reduce(T v) @@ -610,6 +612,24 @@ namespace dace { return v; } }; +#elif defined(__HIPCC__) + template + struct warpReduce { + static DACE_DFI T reduce(T v) + { + for (int i = 1; i < warpSize; i = i * 2) + v = _wcr_fixed()(v, __shfl_xor(v, i)); + return v; + } + + template + static DACE_DFI T mini_reduce(T v) + { + for (int i = 1; i < NUM_MW; i = i * 2) + v = _wcr_fixed()(v, __shfl_xor(v, i)); + return v; + } + }; #endif } // namespace dace