-
I'm implementing a DAG with awkward arrays, storing operations (nodes) and data (edges) as two arrays in a numba structref based on the numba docs. import awkward as ak
import numba as nb
import numpy as np
import weakref
from numba import njit
from numba.core import types
from numba.experimental import structref
import itertools
import collections
"""Define a DAG graph of operations and data.
Each operation can have 0 or more data as input and 0 or more data as output
Each data can have an optional input operation (which is generating the data), and multiple operations where it is required as input
Challange: Implement the Data.input() method which returns an Operation or None in numba mode, by hook or by crook (nb.objmode())
"""
# Define a StructRef.
# `structref.register` associates the type with the default data model.
# This will also install getters and setters to the fields of
# the StructRef.
@structref.register
class GraphType(types.StructRef):
def preprocess_fields(self, fields):
# This method is called by the type constructor for additional
# preprocessing on the fields.
# Here, we don't want the struct to take Literal types.
return tuple((name, types.unliteral(typ)) for name, typ in fields)
# Define a Python type that can be use as a proxy to the StructRef
# allocated inside Numba. Users can construct the StructRef via
# the constructor for this type in python code and jit-code.
# The weakref cache dict to get back the graph from the operations and layers
# Note: Use of eval to access these to avoid pickling errors
_graph_cache = weakref.WeakValueDictionary()
_graph_types_cache = weakref.WeakKeyDictionary()
_graph_idx_seq = itertools.count()
class Graph(structref.StructRefProxy):
def __new__(cls, ops, dat):
graph_idx = next(_graph_idx_seq)
ops = ak.Array(ops, with_name='Operation', behavior={
'Operation':Operation,
'graph': graph_idx,
})
dat = ak.Array(dat, with_name='Data', behavior={
'Data': Data,
'graph': graph_idx,
('__numba_typer__','Data','input', ()): Data._input__numba_typer__,
('__numba_lower__','Data','input', ()): Data._input__numba_lower__,
})
res = structref.StructRefProxy.__new__(cls, ops, dat)
res.ops = ops
res.dat = dat
eval('_graph_cache')[graph_idx] = res
eval('_graph_types_cache')[res] = nb.typeof(ops), nb.typeof(ops[0]), nb.typeof(dat), nb.typeof(dat[0])
return res
# This associates the proxy with MyStructType for the given set of
# fields. Notice how we are not constraining the type of each field.
# Field types remain generic.
structref.define_proxy(Graph, GraphType, ["ops", "dat"])
class Operation(ak.Record):
def graph(self):
return get_graph_from_idx(self.behavior['graph'])
def inputs(self):
return [self.graph().dat[idx] for idx in self.inputs_idx]
def outputs(self):
return [self.graph().dat[idx] for idx in self.outputs_idx]
def get_graph_from_idx(idx):
return eval('_graph_cache')[idx]
def _get_types(idx):
return eval('_graph_types_cache')[eval('_graph_cache')[idx]]
class Data(ak.Record):
def graph(self):
return get_graph_from_idx(self.behavior['graph'])
def input(self):
print('input()', self)
return self.graph().ops[self.input_idx] if self.input_idx >= 0 else None
def outputs(self):
return [self.graph().ops[idx] for idx in self.outputs_idx]
def _input__numba_typer__(akt, extra=()):
return nb.types.Optional(_get_types(akt.behavior['graph'])[1])()
def _input__numba_lower__(context, builder, sig, args):
print(f'_input__numba_lower__: sig.args={[type(a) for a in sig.args]}, sig.return_type={type(sig.return_type)}, {len(args)=}')
ret_type = sig.return_type
def input_lower(dat):
# Fixme: TypingError: Failed in nopython mode pipeline
print('get_node() for:', dat.idx)
with nb.objmode(inp=ret_type):
inp = dat.input()
return inp
return context.compile_internal(builder, input_lower, sig, args)
operations = ak.Array({
'idx': range(3),
'inputs_idx': [[], [0,1],[1,2]],
'outputs_idx': [[1],[2,3],[]],
})
data = ak.Array({
'idx':range(4),
'input_idx': [-1, 0, 1, 1],
'input_port': [-1, 0, 0, 1],
'outputs_idx': [[1], [1,2], [2], []],
})
extra_op_params = {'prop1':np.sin(np.arange(3))} # Arbitrary extra fields
for k, v in extra_op_params.items():
operations[k] = v
g = Graph(operations, data)
@nb.njit()
def func(l, is_compiled=False):
print('data:', l.idx, l)
with nb.objmode():
print('objmode input:', l.input())
inp = l.input()
if inp is None:
print('inp:', 'is None')
else:
print('inp:', inp)
print('Done:', is_compiled)
func.py_func(g.dat[1])
print('----------------')
func(g.dat[1], True) I've run out of ideas to implement this. The error I'm getting is:
|
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 3 replies
-
What is |
Beta Was this translation helpful? Give feedback.
-
Changing the implementation to def _input__numba_lower__(context, builder, sig, args):
print(f'_driver__numba_lower__: sig.args={[type(a) for a in sig.args]}, sig.return_type={type(sig.return_type)}, {len(args)=}')
ret_type = sig.return_type
@nb.njit(ret_type(*sig.args))
def get_data_input(dat):
with nb.objmode(inp=ret_type):
inp = dat.input()
return inp
def input_lower(dat):
# Fixme: TypingError: Failed in nopython mode pipeline
print('get_node() for:', dat.idx)
return get_data_input(dat)
return context.compile_internal(builder, input_lower, sig, args) results in another strange numba error:
|
Beta Was this translation helpful? Give feedback.
-
Hi! Unfortunately, I can't tell, by reading the code, whether it's supposed to work or not. Even if it were my own code, I would break it down into smaller pieces and get each piece working before attempting these large code blocks that mix many (probably incompatible) concepts. If one piece doesn't work that seems like it ought to, that would be a bug report. Awkward Arrays are intended to be used as arguments and return values of functions decorated as Beyond that, if you want to add methods to
I can't tell what the weakrefs are for, but I doubt they play well with Numba. You probably can't pass them as arguments to a compiled function or as parts of a Why The
looked to me like a mangled C++ name, but I ran it through |
Beta Was this translation helpful? Give feedback.
-
I finally got it working.
def _input__numba_lower__(context, builder, sig, args):
print(f'_driver__numba_lower__: sig.args={[type(a) for a in sig.args]}, sig.return_type={type(sig.return_type)}, {len(args)=}')
ret_type = sig.return_type
@nb.njit(ret_type(*sig.args))
def get_data_input(dat):
with nb.objmode(inp=ret_type):
inp = dat.input()
inp.numba_type # >>> Removing this line causes segfault
return inp
def input_lower(dat):
# Fixme: TypingError: Failed in nopython mode pipeline
# print('get_node() for:', dat.idx) # >>> having print here causes numba "RuntimeError: missing Environment"
return get_data_input(dat)
return context.compile_internal(builder, input_lower, sig, args) |
Beta Was this translation helpful? Give feedback.
-
I want to keep all of the discussions open. Issues get closed when they're done, but it's valuable to keep discussions around—even if they're resolved—because they're useful to other people with the same questions. |
Beta Was this translation helpful? Give feedback.
I finally got it working.
RuntimeError: missing Environment
got fixed by removing the print() call within the lower() function..numba_type
so that the numba view is created for the recordThis segfault is what I haven been trying to fix for a long while and which led me to the complex code I shared, changing the graph implementation between a namedtuple, a jitclass, a structmodel and a structref.