diff --git a/lang/femtocode/asts/statementlist.py b/lang/femtocode/asts/statementlist.py index 2ea2fbf..b3237e9 100644 --- a/lang/femtocode/asts/statementlist.py +++ b/lang/femtocode/asts/statementlist.py @@ -501,7 +501,7 @@ def __hash__(self): return hash(("statementlist.ExplodeSize", self.column, self.explosions)) def columnNames(self): - return [self.column] + list(self.explosions) + return [self.column] def inputSizes(self, statements): return list(self.explosions) @@ -542,7 +542,7 @@ def __hash__(self): return hash(("statementlist.ExplodeData", self.column, self.schema, self.data, self.fromsize, self.explodesize, self.explosions)) def columnNames(self): - return [self.column, self.data, self.fromsize, self.explodesize] + list(self.explosions) + return [self.column, self.data, self.fromsize, self.explodesize] def inputSizes(self, statements): for statement in statements: @@ -631,6 +631,10 @@ def build(tree, dataset, replacements={}, refnumber=0, explosions=()): inputs = {ref.data: tree.schema} else: inputs = {} + if ref.size is not None: + for c in dataset.columns.values(): + if ref.size == c.size: + inputs[ref.size] = None return ref, Statements(), inputs, replacements, refnumber @@ -811,7 +815,7 @@ def columns(self): return [r.size for n, r in self.namesToRefs if isinstance(r, Ref) and r.size is not None] + [r.data for n, r in self.namesToRefs if isinstance(r, Ref)] def columnNames(self): - return sum(self.namesToRefs.columnNames(), []) + return sum([x.columnNames() for x in self.namesToRefs.values()], []) def initialize(self): from femtocode.testdataset import TestDataset diff --git a/lang/femtocode/dataset.py b/lang/femtocode/dataset.py index b1f21a4..ce083c7 100644 --- a/lang/femtocode/dataset.py +++ b/lang/femtocode/dataset.py @@ -416,6 +416,13 @@ def dataset(self, name, groups=(), columns=None, schema=True): for column in list(dataset.columns): if column not in columns: del dataset.columns[column] + for group in dataset.groups: + todrop = [] + for n, seg in group.segments.items(): + if n not in columns: + todrop.append(n) + while len(todrop) > 0: + del group.segments[todrop.pop()] # drop schema if not requested if not schema: diff --git a/lang/femtocode/lib/standard.py b/lang/femtocode/lib/standard.py index ec599d9..34c6f73 100644 --- a/lang/femtocode/lib/standard.py +++ b/lang/femtocode/lib/standard.py @@ -740,6 +740,10 @@ def buildstatements(self, call, dataset, replacements, refnumber, explosions): if reref.data in dataset.columns: inputs[reref.data] = reref.schema + if reref.size is not None: + for c in dataset.columns.values(): + if reref.size == c.size: + inputs[reref.size] = None return reref, statements, inputs, replacements, refnumber @@ -787,6 +791,10 @@ def buildstatements(self, call, dataset, replacements, refnumber, explosions): inputs = {reref.data: reref.schema} else: inputs = {} + if reref.size is not None: + for c in dataset.columns.values(): + if reref.size == c.size: + inputs[reref.size] = None replacements[(typedtree.TypedTree, call.args[1].refs[0])] = replacements.get((typedtree.TypedTree, call.args[1].refs[0]), {}) replacements[(typedtree.TypedTree, call.args[1].refs[0])][extendedExplosions] = reref diff --git a/numpyio/femtocode/numpyio/fetch.py b/numpyio/femtocode/numpyio/fetch.py index f7448ed..8ca0fe7 100644 --- a/numpyio/femtocode/numpyio/fetch.py +++ b/numpyio/femtocode/numpyio/fetch.py @@ -29,6 +29,7 @@ from femtocode.py23 import * from femtocode.dataset import ColumnName from femtocode.dataset import sizeType +from femtocode.execution import ExecutionFailure from femtocode.run.compute import DataAddress from femtocode.run.cache import CacheOccupant from femtocode.numpyio.xrootd import XRootDReader @@ -43,8 +44,12 @@ def __init__(self, occupants, workItem): self.daemon = True def files(self, column): + out = None if column.issize(): - out = self.workItem.group.segments[column.dropsize()].files + for n, c in self.workItem.executor.query.dataset.columns.items(): + if c.size == column: + out = self.workItem.group.segments[n].files + break else: out = self.workItem.group.segments[column].files @@ -53,45 +58,51 @@ def files(self, column): return out def run(self): - filesToOccupants = {} - - for occupant in self.occupants: - for fileName in self.files(occupant.address.column): - if fileName not in filesToOccupants: - filesToOccupants[fileName] = [] - filesToOccupants[fileName].append(occupant) - - for fileName, occupants in filesToOccupants.items(): - protocol = urlparse(fileName).scheme - if protocol == "": - zf = zipfile.ZipFile(open(fileName, "rb")) - elif protocol == "root": - zf = zipfile.ZipFile(XRootDReader(fileName)) - else: - raise NotImplementedError - - for occupant in occupants: - stream = zf.open(str(occupant.address.column) + ".npy") - assert stream.read(6) == "\x93NUMPY" - - version = struct.unpack("bb", stream.read(2)) - if version[0] == 1: - headerlen, = struct.unpack("".format(self.address, id(self)) @@ -115,6 +115,16 @@ def add(self, occupant): self.order.append(occupant) self.lookup[occupant.address] = occupant + def discard(self, occupant): + try: + self.order.remove(occupant) + except ValueError: + pass + try: + del self.lookup[occupant.address] + except KeyError: + pass + def extract(self, address): assert address in self.lookup occupant = None @@ -157,7 +167,24 @@ def demoteNeedsToWants(self): for occupant in todemote: del self.need[occupant.address] self.want.add(occupant) - + + def removeFailures(self): + toremove = [] + for occupant in self.need.values(): + with occupant.lock: + if occupant.fetchfailure is not None: + toremove.append(occupant) + for occupant in toremove: + del self.need[occupant.address] + + toremove = [] + for occupant in self.want: + with occupant.lock: + if occupant.fetchfailure is not None: + toremove.append(occupant) + for occupant in toremove: + self.want.discard(occupant) + def howManyToEvict(self, workItem): required = workItem.required() @@ -310,6 +337,11 @@ def run(self): toremove.append(index) self.outgoing.put(workItem) workItem.executor.oneLoadDone(workItem.group.id) + else: + fetchfailure = workItem.fetchfailure() + if fetchfailure is not None: + workItem.executor.oneFailure(fetchfailure) + toremove.append(index) while len(toremove) > 0: del self.loading[toremove.pop()] diff --git a/run/femtocode/run/compute.py b/run/femtocode/run/compute.py index 2f515c7..fb80d01 100644 --- a/run/femtocode/run/compute.py +++ b/run/femtocode/run/compute.py @@ -75,6 +75,14 @@ def ready(self): assert len(self.occupants) != 0 return all(occupant.ready() for occupant in self.occupants) + def fetchfailure(self): + for occupant in self.occupants: + with occupant.lock: + fetchfailure = occupant.fetchfailure + if fetchfailure is not None: + return fetchfailure + return None + def decrementNeed(self): assert len(self.occupants) != 0 for occupant in self.occupants: diff --git a/run/femtocode/run/standalone.py b/run/femtocode/run/standalone.py index 18a0b03..2aca276 100644 --- a/run/femtocode/run/standalone.py +++ b/run/femtocode/run/standalone.py @@ -115,7 +115,7 @@ def source(self, name): def submit(self, query, ondone=None, onupdate=None, debug=False): # attach a more detailed Dataset to the query (same content, but with runtime details) - query.dataset = self.metadata.dataset(query.dataset.name, list(xrange(query.dataset.numGroups)), query.statements.columnNames(), False) + query.dataset = self.metadata.dataset(query.dataset.name, list(xrange(query.dataset.numGroups)), query.inputs.keys(), False) # create an executor with a reference to the FutureQueryResult we will return to the user query.lock = threading.Lock() @@ -133,17 +133,23 @@ def submit(self, query, ondone=None, onupdate=None, debug=False): session = StandaloneSession() session.metadata.directory = "/home/pivarski/diana/femtocode/tests" - def callback(outputdataset): - print outputdataset, len(list(outputdataset)) + # def callback(outputdataset): + # print outputdataset, len(list(outputdataset)) - from femtocode.lib.custom import * - custom = CustomLibrary() - custom.add(CustomFlatFunction("mysin", "math", "sin", lambda x: real)) + # from femtocode.lib.custom import * + # custom = CustomLibrary() + # custom.add(CustomFlatFunction("mysin", "math", "sin", lambda x: real)) - result = session.source("xy").define(z = "x + y").toPython(a = "z - 3", b = "z - 0.5", c = "mysin(x)").submit(libs=custom) + # result = session.source("xy").define(z = "x + y").toPython(a = "z - 3", b = "z - 0.5", c = "mysin(x)").submit(libs=custom) - complete = result.await() - print result.wallTime, result.computeTime + # complete = result.await() + # print result.wallTime, result.computeTime - for event in complete: + # for event in complete: + # print event + + pending = session.source("MuOnia").toPython(pt = "muons.map($1.pt)").submit(debug = True) + final = pending.await() + print pending.wallTime, pending.computeTime + for event in final: print event diff --git a/tests/MuOnia.json b/tests/MuOnia.json index d78bcb7..a534cf6 100644 --- a/tests/MuOnia.json +++ b/tests/MuOnia.json @@ -1 +1 @@ -{"groups": [{"files": ["/home/pivarski/storage/data/00000000-0000-0000-0000-000000000000.root"], "segments": {"jets[]-mass": {"files": null, "numEntries": 48131, "sizeLength": 48131, "dataLength": 806177}, "jets[]-pt": {"files": null, "numEntries": 48131, "sizeLength": 48131, "dataLength": 806177}, "muons[]-pt": {"files": null, "numEntries": 48131, "sizeLength": 48131, "dataLength": 132274}, "muons[]-eta": {"files": null, "numEntries": 48131, "sizeLength": 48131, "dataLength": 132274}, "jets[]-eta": {"files": null, "numEntries": 48131, "sizeLength": 48131, "dataLength": 806177}, "jets[]-phi": {"files": null, "numEntries": 48131, "sizeLength": 48131, "dataLength": 806177}, "muons[]-phi": {"files": null, "numEntries": 48131, "sizeLength": 48131, "dataLength": 132274}}, "numEntries": 48131, "id": 0}], "numEntries": 48131, "name": "MuOnia", "columns": {"jets[]-mass": {"dataBranch": "patJets_slimmedJets__PAT.obj.m_state.p4Polar_.fCoordinates.fM", "dataType": "float64", "tree": "Events", "sizeBranch": "patJets_slimmedJets__PAT.obj", "data": "jets[]-mass", "size": "jets[]-mass@size"}, "jets[]-pt": {"dataBranch": "patJets_slimmedJets__PAT.obj.m_state.p4Polar_.fCoordinates.fPt", "dataType": "float64", "tree": "Events", "sizeBranch": "patJets_slimmedJets__PAT.obj", "data": "jets[]-pt", "size": "jets[]-pt@size"}, "muons[]-pt": {"dataBranch": "patMuons_slimmedMuons__PAT.obj.m_state.p4Polar_.fCoordinates.fPt", "dataType": "float64", "tree": "Events", "sizeBranch": "patMuons_slimmedMuons__PAT.obj", "data": "muons[]-pt", "size": "muons[]-pt@size"}, "muons[]-eta": {"dataBranch": "patMuons_slimmedMuons__PAT.obj.m_state.p4Polar_.fCoordinates.fEta", "dataType": "float64", "tree": "Events", "sizeBranch": "patMuons_slimmedMuons__PAT.obj", "data": "muons[]-eta", "size": "muons[]-eta@size"}, "jets[]-eta": {"dataBranch": "patJets_slimmedJets__PAT.obj.m_state.p4Polar_.fCoordinates.fEta", "dataType": "float64", "tree": "Events", "sizeBranch": "patJets_slimmedJets__PAT.obj", "data": "jets[]-eta", "size": "jets[]-eta@size"}, "jets[]-phi": {"dataBranch": "patJets_slimmedJets__PAT.obj.m_state.p4Polar_.fCoordinates.fPhi", "dataType": "float64", "tree": "Events", "sizeBranch": "patJets_slimmedJets__PAT.obj", "data": "jets[]-phi", "size": "jets[]-phi@size"}, "muons[]-phi": {"dataBranch": "patMuons_slimmedMuons__PAT.obj.m_state.p4Polar_.fCoordinates.fPhi", "dataType": "float64", "tree": "Events", "sizeBranch": "patMuons_slimmedMuons__PAT.obj", "data": "muons[]-phi", "size": "muons[]-phi@size"}}, "schema": {"jets": {"items": {"fields": {"phi": {"max": 3.141592653589793, "type": "real", "min": -3.141592653589793}, "eta": "real", "mass": {"max": {"almost": "inf"}, "type": "real", "min": 0}, "pt": {"max": {"almost": "inf"}, "type": "real", "min": 0}}, "type": "record"}, "type": "collection"}, "muons": {"items": {"fields": {"phi": {"max": 3.141592653589793, "type": "real", "min": -3.141592653589793}, "eta": "real", "pt": {"max": {"almost": "inf"}, "type": "real", "min": 0}}, "type": "record"}, "type": "collection"}}} +{"name": "MuOnia", "numGroups": 1, "numEntries": 48131, "groups": [{"files": ["/home/pivarski/storage/data/00000000-0000-0000-0000-000000000000.root"], "segments": {"jets[]-mass": {"files": null, "sizeLength": 48131, "numEntries": 48131, "dataLength": 806177}, "jets[]-pt": {"files": null, "sizeLength": 48131, "numEntries": 48131, "dataLength": 806177}, "muons[]-pt": {"files": null, "sizeLength": 48131, "numEntries": 48131, "dataLength": 132274}, "muons[]-eta": {"files": null, "sizeLength": 48131, "numEntries": 48131, "dataLength": 132274}, "jets[]-eta": {"files": null, "sizeLength": 48131, "numEntries": 48131, "dataLength": 806177}, "jets[]-phi": {"files": null, "sizeLength": 48131, "numEntries": 48131, "dataLength": 806177}, "muons[]-phi": {"files": null, "sizeLength": 48131, "numEntries": 48131, "dataLength": 132274}}, "numEntries": 48131, "id": 0}], "class": "femtocode.rootio.dataset.ROOTDataset", "columns": {"jets[]-mass": {"dataBranch": "patJets_slimmedJets__PAT.obj.m_state.p4Polar_.fCoordinates.fM", "dataType": "float64", "tree": "Events", "sizeBranch": "patJets_slimmedJets__PAT.obj", "data": "jets[]-mass", "size": "jets[]@size"}, "jets[]-pt": {"dataBranch": "patJets_slimmedJets__PAT.obj.m_state.p4Polar_.fCoordinates.fPt", "dataType": "float64", "tree": "Events", "sizeBranch": "patJets_slimmedJets__PAT.obj", "data": "jets[]-pt", "size": "jets[]@size"}, "muons[]-pt": {"dataBranch": "patMuons_slimmedMuons__PAT.obj.m_state.p4Polar_.fCoordinates.fPt", "dataType": "float64", "tree": "Events", "sizeBranch": "patMuons_slimmedMuons__PAT.obj", "data": "muons[]-pt", "size": "muons[]@size"}, "muons[]-eta": {"dataBranch": "patMuons_slimmedMuons__PAT.obj.m_state.p4Polar_.fCoordinates.fEta", "dataType": "float64", "tree": "Events", "sizeBranch": "patMuons_slimmedMuons__PAT.obj", "data": "muons[]-eta", "size": "muons[]@size"}, "jets[]-eta": {"dataBranch": "patJets_slimmedJets__PAT.obj.m_state.p4Polar_.fCoordinates.fEta", "dataType": "float64", "tree": "Events", "sizeBranch": "patJets_slimmedJets__PAT.obj", "data": "jets[]-eta", "size": "jets[]@size"}, "jets[]-phi": {"dataBranch": "patJets_slimmedJets__PAT.obj.m_state.p4Polar_.fCoordinates.fPhi", "dataType": "float64", "tree": "Events", "sizeBranch": "patJets_slimmedJets__PAT.obj", "data": "jets[]-phi", "size": "jets[]@size"}, "muons[]-phi": {"dataBranch": "patMuons_slimmedMuons__PAT.obj.m_state.p4Polar_.fCoordinates.fPhi", "dataType": "float64", "tree": "Events", "sizeBranch": "patMuons_slimmedMuons__PAT.obj", "data": "muons[]-phi", "size": "muons[]@size"}}, "schema": {"jets": {"items": {"fields": {"phi": {"max": 3.141592653589793, "type": "real", "min": -3.141592653589793}, "eta": "real", "mass": {"max": {"almost": "inf"}, "type": "real", "min": 0}, "pt": {"max": {"almost": "inf"}, "type": "real", "min": 0}}, "type": "record"}, "type": "collection"}, "muons": {"items": {"fields": {"phi": {"max": 3.141592653589793, "type": "real", "min": -3.141592653589793}, "eta": "real", "pt": {"max": {"almost": "inf"}, "type": "real", "min": 0}}, "type": "record"}, "type": "collection"}}}