Skip to content

Commit

Permalink
llvm: Use numpy array for dynamically sized output arguments (#3033)
Browse files Browse the repository at this point in the history
Use numpy arrays for "run()" results.
Use numpy arrays for "evaluate()" results.
Simplify GridSearch compiled search function invocation.
  • Loading branch information
jvesely authored Aug 21, 2024
2 parents 7047302 + 7267a1a commit 2d0899e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2096,14 +2096,11 @@ def _function(self,
# if ocm is not None and ocm.parameters.comp_execution_mode._get(context) in {"PTX", "LLVM"}:
if ocm is not None and ocm.parameters.comp_execution_mode._get(context) in {"PTX", "LLVM"}:

ct_values = all_values
num_values = len(ct_values)

# Reduce array of values to min/max
# select_min params are:
# params, state, min_sample_ptr, sample_ptr, min_value_ptr, value_ptr, opt_count_ptr, count
# params, state, min_sample_ptr, sample_ptr, min_value_ptr, value_ptr, opt_count_ptr, start, stop
min_tags = frozenset({"select_min", "evaluate_type_objective"})
bin_func = pnlvm.LLVMBinaryFunction.from_obj(self, tags=min_tags, ctype_ptr_args=(0, 1, 3, 5))
bin_func = pnlvm.LLVMBinaryFunction.from_obj(self, tags=min_tags, ctype_ptr_args=(0, 1, 3), dynamic_size_args=(5,))

ct_param = bin_func.byref_arg_types[0](*self._get_param_initializer(context))
ct_state = bin_func.byref_arg_types[1](*self._get_state_initializer(context))
Expand All @@ -2114,15 +2111,12 @@ def _function(self,
bin_func(ct_param,
ct_state,
optimal_sample,
None, # samples. NULL, it's generated by the function.
None, # samples. NULL, it's generated by the function.
optimal_value,
ct_values,
all_values,
number_of_optimal_values,
bin_func.c_func.argtypes[7](0), # start
bin_func.c_func.argtypes[8](num_values)) # stop

# Convert outputs to Numpy/Python
all_values = np.ctypeslib.as_array(ct_values)
0, # start
len(all_values)) # stop

# Python version
else:
Expand Down
15 changes: 9 additions & 6 deletions psyneulink/core/llvm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _llvm_build(target_generation=_binary_generation + 1):


class LLVMBinaryFunction:
def __init__(self, name: str, *, ctype_ptr_args=()):
def __init__(self, name: str, *, ctype_ptr_args:tuple=(), dynamic_size_args:tuple=()):
self.name = name

self.__c_func = None
Expand Down Expand Up @@ -154,7 +154,10 @@ def __init__(self, name: str, *, ctype_ptr_args=()):

for i, arg in enumerate(self.np_arg_dtypes):
if i not in ctype_ptr_args and self.byref_arg_types[i] is not None:
args[i] = np.ctypeslib.ndpointer(dtype=arg.base, shape=arg.shape)
if i in dynamic_size_args:
args[i] = np.ctypeslib.ndpointer(dtype=arg.base, ndim=len(arg.shape) + 1, flags='C_CONTIGUOUS')
else:
args[i] = np.ctypeslib.ndpointer(dtype=arg.base, shape=arg.shape, flags='C_CONTIGUOUS')

middle = time.perf_counter()
self.__c_func_type = ctypes.CFUNCTYPE(return_type, *args)
Expand Down Expand Up @@ -233,14 +236,14 @@ def np_buffer_for_arg(self, arg_num, *, extra_dimensions=(), fill_value=np.nan):

@staticmethod
@functools.lru_cache(maxsize=32)
def from_obj(obj, *, tags:frozenset=frozenset(), ctype_ptr_args:tuple=()):
def from_obj(obj, *, tags:frozenset=frozenset(), ctype_ptr_args:tuple=(), dynamic_size_args:tuple=()):
name = LLVMBuilderContext.get_current().gen_llvm_function(obj, tags=tags).name
return LLVMBinaryFunction.get(name, ctype_ptr_args=ctype_ptr_args)
return LLVMBinaryFunction.get(name, ctype_ptr_args=ctype_ptr_args, dynamic_size_args=dynamic_size_args)

@staticmethod
@functools.lru_cache(maxsize=32)
def get(name: str, *, ctype_ptr_args:tuple=()):
return LLVMBinaryFunction(name, ctype_ptr_args=ctype_ptr_args)
def get(name: str, *, ctype_ptr_args:tuple=(), dynamic_size_args:tuple=()):
return LLVMBinaryFunction(name, ctype_ptr_args=ctype_ptr_args, dynamic_size_args=dynamic_size_args)


_cpu_engine = None
Expand Down
72 changes: 31 additions & 41 deletions psyneulink/core/llvm/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,6 @@
__all__ = ['CompExecution', 'FuncExecution', 'MechExecution']


def _convert_ctype_to_python(x):
if isinstance(x, ctypes.Structure):
return [_convert_ctype_to_python(getattr(x, field_name)) for field_name, _ in x._fields_]
if isinstance(x, ctypes.Array):
return [_convert_ctype_to_python(el) for el in x]
if isinstance(x, (ctypes.c_double, ctypes.c_float)):
return x.value
if isinstance(x, (float, int)):
return x

assert False, "Don't know how to convert: {}".format(x)


def _tupleize(x):
try:
return tuple(_tupleize(y) for y in x)
Expand Down Expand Up @@ -557,7 +544,8 @@ def _bin_run_func(self):
if self.__bin_run_func is None:
self.__bin_run_func = pnlvm.LLVMBinaryFunction.from_obj(self._composition,
tags=self.__tags.union({"run"}),
ctype_ptr_args=(3, 4))
ctype_ptr_args=(3,),
dynamic_size_args=(4,))

return self.__bin_run_func

Expand All @@ -572,53 +560,53 @@ def _prepare_run(self, inputs, runs, num_input_sets):
inputs = self._get_run_input_struct(inputs, num_input_sets)

# Create output buffer
outputs = (self._bin_run_func.byref_arg_types[4] * runs)()
outputs = self._bin_func.np_buffer_for_arg(4, extra_dimensions=(runs,))
assert ctypes.sizeof(self._bin_run_func.byref_arg_types[4]) * runs == outputs.nbytes

if "stat" in self._debug_env:
print("Output struct size:", _pretty_size(ctypes.sizeof(outputs)),
"for", self._composition.name)
print("Output struct size:", _pretty_size(outputs.nbytes), "for", self._composition.name)

runs_count = np.asarray(runs, dtype=np.uint32).copy()
input_count = np.asarray(num_input_sets, dtype=np.uint32)

return inputs, outputs, runs_count, input_count

def run(self, inputs, runs, num_input_sets):
ct_inputs, ct_outputs, runs_count, input_count = self._prepare_run(inputs, runs, num_input_sets)
ct_inputs, outputs, runs_count, input_count = self._prepare_run(inputs, runs, num_input_sets)

self._bin_run_func(self._state_struct,
self._param_struct,
self._data_struct,
ct_inputs,
ct_outputs,
outputs,
runs_count,
input_count)

# Extract only #trials elements in case the run exited early
assert runs_count <= runs, "Composition ran more times than allowed!"
return _convert_ctype_to_python(ct_outputs)[0:runs_count]
return self._get_indexable(outputs[0:runs_count])

def cuda_run(self, inputs, runs, num_input_sets):
ct_inputs, ct_outputs, runs_count, input_count = self._prepare_run(inputs, runs, num_input_sets)
ct_inputs, outputs, runs_count, input_count = self._prepare_run(inputs, runs, num_input_sets)

self._bin_run_func.cuda_call(self._cuda_state_struct,
self._cuda_param_struct,
self._cuda_data_struct,
jit_engine.pycuda.driver.In(np.ctypeslib.as_array(ct_inputs)),
jit_engine.pycuda.driver.Out(np.ctypeslib.as_array(ct_outputs)),
jit_engine.pycuda.driver.Out(outputs),
jit_engine.pycuda.driver.InOut(runs_count),
jit_engine.pycuda.driver.In(input_count))

# Extract only #trials elements in case the run exited early
assert runs_count <= runs, "Composition ran more times than allowed: {}".format(runs)
return _convert_ctype_to_python(ct_outputs)[0:runs_count]
return self._get_indexable(outputs[0:runs_count])

def _prepare_evaluate(self, inputs, num_input_sets, num_evaluations, all_results:bool):
ocm = self._composition.controller

eval_type = "evaluate_type_all_results" if all_results else "evaluate_type_objective"
tags = {"evaluate", "alloc_range", eval_type}
bin_func = pnlvm.LLVMBinaryFunction.from_obj(ocm, tags=frozenset(tags), ctype_ptr_args=(4, 5))
bin_func = pnlvm.LLVMBinaryFunction.from_obj(ocm, tags=frozenset(tags), ctype_ptr_args=(5,), dynamic_size_args=(4,))
self.__bin_func = bin_func

# There are 8 arguments to evaluate_alloc_range:
Expand All @@ -635,42 +623,42 @@ def _prepare_evaluate(self, inputs, num_input_sets, num_evaluations, all_results
# Construct input variable, the 5th parameter of the evaluate function
ct_inputs = self._get_run_input_struct(inputs, num_input_sets, 5)

# Output ctype
out_el_ty = bin_func.byref_arg_types[4]
# Output buffer
extra_dims = (num_evaluations,)
if all_results:
num_trials = ocm.parameters.num_trials_per_estimate.get(self._execution_context)
if num_trials is None:
num_trials = num_input_sets
out_el_ty *= num_trials
out_ty = out_el_ty * num_evaluations
assert num_trials is not None
extra_dims = extra_dims + (num_trials,)

outputs = self._bin_func.np_buffer_for_arg(4, extra_dimensions=extra_dims)

num_inputs = np.asarray(num_input_sets, dtype=np.uint32)
if "stat" in self._debug_env:
print("Evaluate result struct type size:",
_pretty_size(ctypes.sizeof(out_ty)),
_pretty_size(ctypes.sizeof(outputs.nbytes)),
"( evaluations:", num_evaluations, "element size:", ctypes.sizeof(out_el_ty), ")",
"for", self._obj.name)

return comp_params, comp_state, comp_data, ct_inputs, out_ty(), num_inputs
return comp_params, comp_state, comp_data, ct_inputs, outputs, num_inputs

def cuda_evaluate(self, inputs, num_input_sets, num_evaluations, all_results:bool=False):
comp_params, comp_state, comp_data, ct_inputs, ct_results, num_inputs = \
comp_params, comp_state, comp_data, ct_inputs, results, num_inputs = \
self._prepare_evaluate(inputs, num_input_sets, num_evaluations, all_results)

cuda_args = (jit_engine.pycuda.driver.In(comp_params),
jit_engine.pycuda.driver.In(comp_state),
jit_engine.pycuda.driver.Out(np.ctypeslib.as_array(ct_results)), # results
jit_engine.pycuda.driver.Out(results), # results
jit_engine.pycuda.driver.In(np.ctypeslib.as_array(ct_inputs)), # inputs
jit_engine.pycuda.driver.In(comp_data), # composition data
jit_engine.pycuda.driver.In(num_inputs), # number of inputs
)

self.__bin_func.cuda_call(*cuda_args, threads=int(num_evaluations))

return ct_results
return results

def thread_evaluate(self, inputs, num_input_sets, num_evaluations, all_results:bool=False):
comp_params, comp_state, comp_data, ct_inputs, ct_results, num_inputs = \
comp_params, comp_state, comp_data, ct_inputs, outputs, num_inputs = \
self._prepare_evaluate(inputs, num_input_sets, num_evaluations, all_results)

jobs = min(os.cpu_count(), num_evaluations)
Expand All @@ -679,19 +667,21 @@ def thread_evaluate(self, inputs, num_input_sets, num_evaluations, all_results:b
parallel_start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs) as ex:

# Create input and result typed casts once, they are the same
# for every submitted job.
results_arg = ctypes.cast(ct_results, self.__bin_func.c_func.argtypes[4])
# Create input typed cast once, it is the same for every submitted job.
input_arg = ctypes.cast(ct_inputs, self.__bin_func.c_func.argtypes[5])

# numpy dynamic args expect only one extra dimension
output_arg = outputs.reshape(-1, *self.__bin_func.np_arg_dtypes[4].shape)
assert output_arg.base is outputs

# There are 8 arguments to evaluate_alloc_range:
# comp_param, comp_state, from, to, results, input, comp_data, input length
results = [ex.submit(self.__bin_func,
comp_params,
comp_state,
int(i * evals_per_job),
min((i + 1) * evals_per_job, num_evaluations),
results_arg,
output_arg,
input_arg,
comp_data,
num_inputs)
Expand All @@ -707,4 +697,4 @@ def thread_evaluate(self, inputs, num_input_sets, num_evaluations, all_results:b
exceptions = [r.exception() for r in results]
assert all(e is None for e in exceptions), "Not all jobs finished sucessfully: {}".format(exceptions)

return ct_results
return outputs

0 comments on commit 2d0899e

Please sign in to comment.