Skip to content
This repository has been archived by the owner on Mar 31, 2019. It is now read-only.

Commit

Permalink
working on reading ROOT through standalone; added exception handling …
Browse files Browse the repository at this point in the history
…in Fetchers
  • Loading branch information
jpivarski committed Apr 12, 2017
1 parent 92995c3 commit 20acd06
Show file tree
Hide file tree
Showing 13 changed files with 522 additions and 96 deletions.
10 changes: 7 additions & 3 deletions lang/femtocode/asts/statementlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def __hash__(self):
return hash(("statementlist.ExplodeSize", self.column, self.explosions))

def columnNames(self):
return [self.column] + list(self.explosions)
return [self.column]

def inputSizes(self, statements):
return list(self.explosions)
Expand Down Expand Up @@ -542,7 +542,7 @@ def __hash__(self):
return hash(("statementlist.ExplodeData", self.column, self.schema, self.data, self.fromsize, self.explodesize, self.explosions))

def columnNames(self):
return [self.column, self.data, self.fromsize, self.explodesize] + list(self.explosions)
return [self.column, self.data, self.fromsize, self.explodesize]

def inputSizes(self, statements):
for statement in statements:
Expand Down Expand Up @@ -631,6 +631,10 @@ def build(tree, dataset, replacements={}, refnumber=0, explosions=()):
inputs = {ref.data: tree.schema}
else:
inputs = {}
if ref.size is not None:
for c in dataset.columns.values():
if ref.size == c.size:
inputs[ref.size] = None

return ref, Statements(), inputs, replacements, refnumber

Expand Down Expand Up @@ -811,7 +815,7 @@ def columns(self):
return [r.size for n, r in self.namesToRefs if isinstance(r, Ref) and r.size is not None] + [r.data for n, r in self.namesToRefs if isinstance(r, Ref)]

def columnNames(self):
return sum(self.namesToRefs.columnNames(), [])
return sum([x.columnNames() for x in self.namesToRefs.values()], [])

def initialize(self):
from femtocode.testdataset import TestDataset
Expand Down
7 changes: 7 additions & 0 deletions lang/femtocode/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ def dataset(self, name, groups=(), columns=None, schema=True):
for column in list(dataset.columns):
if column not in columns:
del dataset.columns[column]
for group in dataset.groups:
todrop = []
for n, seg in group.segments.items():
if n not in columns:
todrop.append(n)
while len(todrop) > 0:
del group.segments[todrop.pop()]

# drop schema if not requested
if not schema:
Expand Down
8 changes: 8 additions & 0 deletions lang/femtocode/lib/standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,10 @@ def buildstatements(self, call, dataset, replacements, refnumber, explosions):

if reref.data in dataset.columns:
inputs[reref.data] = reref.schema
if reref.size is not None:
for c in dataset.columns.values():
if reref.size == c.size:
inputs[reref.size] = None

return reref, statements, inputs, replacements, refnumber

Expand Down Expand Up @@ -787,6 +791,10 @@ def buildstatements(self, call, dataset, replacements, refnumber, explosions):
inputs = {reref.data: reref.schema}
else:
inputs = {}
if reref.size is not None:
for c in dataset.columns.values():
if reref.size == c.size:
inputs[reref.size] = None

replacements[(typedtree.TypedTree, call.args[1].refs[0])] = replacements.get((typedtree.TypedTree, call.args[1].refs[0]), {})
replacements[(typedtree.TypedTree, call.args[1].refs[0])][extendedExplosions] = reref
Expand Down
85 changes: 48 additions & 37 deletions numpyio/femtocode/numpyio/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from femtocode.py23 import *
from femtocode.dataset import ColumnName
from femtocode.dataset import sizeType
from femtocode.execution import ExecutionFailure
from femtocode.run.compute import DataAddress
from femtocode.run.cache import CacheOccupant
from femtocode.numpyio.xrootd import XRootDReader
Expand All @@ -43,8 +44,12 @@ def __init__(self, occupants, workItem):
self.daemon = True

def files(self, column):
out = None
if column.issize():
out = self.workItem.group.segments[column.dropsize()].files
for n, c in self.workItem.executor.query.dataset.columns.items():
if c.size == column:
out = self.workItem.group.segments[n].files
break
else:
out = self.workItem.group.segments[column].files

Expand All @@ -53,45 +58,51 @@ def files(self, column):
return out

def run(self):
filesToOccupants = {}

for occupant in self.occupants:
for fileName in self.files(occupant.address.column):
if fileName not in filesToOccupants:
filesToOccupants[fileName] = []
filesToOccupants[fileName].append(occupant)

for fileName, occupants in filesToOccupants.items():
protocol = urlparse(fileName).scheme
if protocol == "":
zf = zipfile.ZipFile(open(fileName, "rb"))
elif protocol == "root":
zf = zipfile.ZipFile(XRootDReader(fileName))
else:
raise NotImplementedError

for occupant in occupants:
stream = zf.open(str(occupant.address.column) + ".npy")
assert stream.read(6) == "\x93NUMPY"

version = struct.unpack("bb", stream.read(2))
if version[0] == 1:
headerlen, = struct.unpack("<H", stream.read(2))
try:
filesToOccupants = {}

for occupant in self.occupants:
for fileName in self.files(occupant.address.column):
if fileName not in filesToOccupants:
filesToOccupants[fileName] = []
filesToOccupants[fileName].append(occupant)

for fileName, occupants in filesToOccupants.items():
protocol = urlparse(fileName).scheme
if protocol == "":
zf = zipfile.ZipFile(open(fileName, "rb"))
elif protocol == "root":
zf = zipfile.ZipFile(XRootDReader(fileName))
else:
headerlen, = struct.unpack("<I", stream.read(4))
raise NotImplementedError

header = stream.read(headerlen)
headerdata = ast.literal_eval(header)
for occupant in occupants:
stream = zf.open(str(occupant.address.column) + ".npy")
assert stream.read(6) == "\x93NUMPY"

dtype = numpy.dtype(headerdata["descr"])
numBytes = reduce(lambda a, b: a * b, (dtype.itemsize,) + headerdata["shape"])
version = struct.unpack("bb", stream.read(2))
if version[0] == 1:
headerlen, = struct.unpack("<H", stream.read(2))
else:
headerlen, = struct.unpack("<I", stream.read(4))

assert occupant.totalBytes == numBytes
header = stream.read(headerlen)
headerdata = ast.literal_eval(header)

readBytes = 0
while readBytes < numBytes:
size = min(self.chunksize, numBytes - readBytes)
readBytes += size
occupant.fill(stream.read(size))
dtype = numpy.dtype(headerdata["descr"])
numBytes = reduce(lambda a, b: a * b, (dtype.itemsize,) + headerdata["shape"])

zf.close()
assert occupant.totalBytes == numBytes

readBytes = 0
while readBytes < numBytes:
size = min(self.chunksize, numBytes - readBytes)
readBytes += size
occupant.fill(stream.read(size))

zf.close()

except Exception as exception:
for occupant in self.occupants:
with occupant.lock:
occupant.fetchfailure = ExecutionFailure(exception, sys.exc_info()[2])
8 changes: 4 additions & 4 deletions rootio/femtocode/rootio/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def _getBranchesForPaths(quantity, paths):
paths[(path, quantity.frm.tree)].append(quantity.frm.branch)

@staticmethod
def _makeGroups(quantity, filesToNumEntries, fileColumnsToLengths, pathsToFiles, fileColumnsToSize, name=None):
def _makeGroups(quantity, filesToNumEntries, fileColumnsToLengths, pathsToFiles, fileColumnsToSize, name=None, sizename=None):
if isinstance(quantity, DatasetDeclaration) or isinstance(quantity, DatasetDeclaration.Record):
newColumns = {}

Expand All @@ -168,7 +168,7 @@ def _makeGroups(quantity, filesToNumEntries, fileColumnsToLengths, pathsToFiles,
else:
subname = name.rec(k)

columns, groups = ROOTDataset._makeGroups(v, filesToNumEntries, fileColumnsToLengths, pathsToFiles, fileColumnsToSize, subname)
columns, groups = ROOTDataset._makeGroups(v, filesToNumEntries, fileColumnsToLengths, pathsToFiles, fileColumnsToSize, subname, sizename)

for n, c in columns.items():
assert n not in newColumns
Expand Down Expand Up @@ -233,7 +233,7 @@ def _makeGroups(quantity, filesToNumEntries, fileColumnsToLengths, pathsToFiles,
return newColumns, newGroups

elif isinstance(quantity, DatasetDeclaration.Collection):
return ROOTDataset._makeGroups(quantity.items, filesToNumEntries, fileColumnsToLengths, pathsToFiles, fileColumnsToSize, name.coll())
return ROOTDataset._makeGroups(quantity.items, filesToNumEntries, fileColumnsToLengths, pathsToFiles, fileColumnsToSize, name.coll(), name.coll().size())

elif isinstance(quantity, DatasetDeclaration.Primitive):
sizeBranch = ()
Expand All @@ -247,7 +247,7 @@ def _makeGroups(quantity, filesToNumEntries, fileColumnsToLengths, pathsToFiles,
raise DatasetDeclaration.Error(quantity.frm.loc, "branch {0} has a counter branch in some files and not others".format(json.dumps(quantity.frm.branch)))

column = ROOTColumn(name,
name.size() if sizeBranch is not None else None,
sizename,
quantity.frm.dtype,
quantity.frm.tree,
quantity.frm.branch,
Expand Down
90 changes: 51 additions & 39 deletions rootio/femtocode/rootio/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import threading

from femtocode.dataset import ColumnName
from femtocode.dataset import sizeType
from femtocode.execution import ExecutionFailure
from femtocode.run.compute import DataAddress
from femtocode.rootio._fastreader import fillarrays

Expand All @@ -29,8 +31,12 @@ def __init__(self, occupants, workItem):
self.daemon = True

def files(self, column):
out = None
if column.issize():
out = self.workItem.group.segments[column.dropsize()].files
for n, c in self.workItem.executor.query.dataset.columns.items():
if c.size == column:
out = self.workItem.group.segments[n].files
break
else:
out = self.workItem.group.segments[column].files

Expand All @@ -39,45 +45,51 @@ def files(self, column):
return out

def run(self):
columnNameToArray = {}
filesetsToColumns = {}
filesetsToOccupants = {}
try:
columnNameToArray = {}
filesetsToColumns = {}
filesetsToOccupants = {}

for occupant in self.occupants:
column = occupant.address.column
columnNameToArray[column] = occupant.rawarray
for occupant in self.occupants:
column = occupant.address.column
columnNameToArray[column] = occupant.rawarray

filesetTree = (tuple(sorted(self.files(column))),
self.workItem.executor.dataset.columns[column].tree)
filesetTree = (tuple(sorted(self.files(column))),
self.workItem.executor.query.dataset.columns[column].tree)

if not column.issize():
if filesetTree not in filesetsToColumns:
filesetsToColumns[filesetTree] = []
filesetsToColumns[filesetTree].append(column)

if filesetTree not in filesetsToOccupants:
filesetsToOccupants[filesetTree] = []
filesetsToOccupants[filesetTree].append(occupant)

for (fileset, tree), columns in filesetsToColumns.items():
toget = []
for column in columns:
if not column.issize():
dataBranch = self.workItem.executor.dataset.columns[column].dataBranch
sizeBranch = self.workItem.executor.dataset.columns[column].sizeBranch
dataArray = columnNameToArray[column].view(self.workItem.executor.dataset.columns[column].dataType)

if sizeBranch is None:
toget.append((dataBranch, dataArray))
else:
sizeArray = columnNameToArray.get(str(column.size()))
if sizeArray is not None:
sizeArray = sizeArray.view(sizeType)

toget.append((dataBranch, sizeBranch, dataArray, sizeArray))

for file in fileset:
fillarrays(file, tree, toget)

for occupant in filesetsToOccupants[(fileset, tree)]:
occupant.filledBytes = occupant.totalBytes
if filesetTree not in filesetsToColumns:
filesetsToColumns[filesetTree] = []
filesetsToColumns[filesetTree].append(column)

if filesetTree not in filesetsToOccupants:
filesetsToOccupants[filesetTree] = []
filesetsToOccupants[filesetTree].append(occupant)

for (fileset, tree), columns in filesetsToColumns.items():
toget = []
for column in columns:
if not column.issize():
dataBranch = self.workItem.executor.query.dataset.columns[column].dataBranch
sizeBranch = self.workItem.executor.query.dataset.columns[column].sizeBranch
dataArray = columnNameToArray[column].view(self.workItem.executor.query.dataset.columns[column].dataType)

if sizeBranch is None:
toget.append((dataBranch, dataArray))
else:
sizeArray = columnNameToArray.get(str(column.size()))
if sizeArray is not None:
sizeArray = sizeArray.view(sizeType)

toget.append((dataBranch, sizeBranch, dataArray, sizeArray))

for file in fileset:
fillarrays(file, tree, toget)

for occupant in filesetsToOccupants[(fileset, tree)]:
occupant.filledBytes = occupant.totalBytes

except Exception as exception:
for occupant in self.occupants:
with occupant.lock:
occupant.fetchfailure = ExecutionFailure(exception, sys.exc_info()[2])
15 changes: 15 additions & 0 deletions rootio/tests/rootio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env python

# Copyright 2016 DIANA-HEP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading

0 comments on commit 20acd06

Please sign in to comment.