Skip to content

Commit

Permalink
v0.5.5 performance improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Apr 1, 2024
1 parent 5d9d0ce commit 82d30a9
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 16 deletions.
2 changes: 1 addition & 1 deletion dataflows/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.4
0.5.5
17 changes: 9 additions & 8 deletions dataflows/base/datastream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,18 @@ def raise_exception(self, cause):
raise error from cause
raise cause

def safe_process(self, on_error=None):
def safe_process(self, return_results=False, on_error=None):
results = []
try:
ds = self._process()
for res in ds.res_iter:
if on_error is not None:
results.append(list(
schema_validator(res.res, res, on_error=on_error)
))
if return_results:
if on_error is not None:
results.append(list(
schema_validator(res.res, res, on_error=on_error)
))
else:
results.append(list(res))
else:
collections.deque(res, maxlen=0)
except UniqueKeyError as e:
Expand All @@ -121,7 +124,5 @@ def process(self):
return ds.dp, ds.merge_stats()

def results(self, on_error=None):
if on_error is None:
on_error = raise_exception
ds, results = self.safe_process(on_error=on_error)
ds, results = self.safe_process(return_results=True, on_error=on_error)
return results, ds.dp, ds.merge_stats()
3 changes: 2 additions & 1 deletion dataflows/base/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from collections.abc import Iterable

from .datastream_processor import DataStreamProcessor
from .schema_validator import raise_exception


class Flow:
def __init__(self, *args):
self.chain = args

def results(self, on_error=None):
def results(self, on_error=raise_exception):
return self._chain().results(on_error=on_error)

def process(self):
Expand Down
16 changes: 11 additions & 5 deletions dataflows/processors/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,10 @@ def safe_process_datapackage(self, dp: Package):
descriptor['encoding'] = self.options['encoding']
self.options['custom_parsers'] = self.get_custom_parsers(self.options.get('custom_parsers'))
self.options.setdefault('ignore_blank_headers', True)
if 'headers' not in self.options:
self.options.setdefault('skip_rows', [{'type': 'preset', 'value': 'auto'}])
self.options.setdefault('headers', 1)
self.options.setdefault('sample_size', 1000)
self.options.setdefault('skip_rows', [{'type': 'preset', 'value': 'auto'}])
stream: Stream = Stream(self.load_source, **self.options).open()
if len(stream.headers) != len(set(stream.headers)):
if not self.deduplicate_headers:
Expand Down Expand Up @@ -215,11 +216,16 @@ def safe_process_datapackage(self, dp: Package):
return dp

def stripper(self, iterator):
whitespace = set(' \t\n\r')
for r in iterator:
yield dict(
(k, v.strip()) if isinstance(v, str) else (k, v)
for k, v in r.items()
)
for k, v in r.items():
if v and isinstance(v, str) and (v[-1] in whitespace or v[0] in whitespace):
r[k] = v.strip()
yield r
# yield dict(
# (k, v.strip()) if isinstance(v, str) else (k, v)
# for k, v in r.items()
# )

def limiter(self, iterator):
count = 0
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def read(*paths):
INSTALL_REQUIRES = [
'dataflows-tabulator>=1.54.0',
'datapackage>=1.15.4',
'tableschema>=1.20.10',
'tableschema>=1.20.11',
'kvfile>=1.1.1',
'click',
'jinja2',
Expand Down
29 changes: 29 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -2523,3 +2523,32 @@ def test_dump_to_json_objects():
for x in ['a', 'b']:
assert res[50][x] == data[50][x]
assert res[50]['c'][x] == data[50][x]


def aux_profile(filename, fast=False):
from dataflows import Flow, load, schema_validator
return Flow(
load(filename, cast_strategy=load.CAST_WITH_SCHEMA),
).results(on_error=None if fast else schema_validator.raise_exception)[0][0]

@pytest.mark.parametrize('fast', [False, True])
def test_profile(fast):
import csv
import tempfile
import os
from decimal import Decimal

NUM = 1000000

with tempfile.TemporaryDirectory() as tmpdirname:
filename = os.path.join(tmpdirname, 'test.csv')
with open(filename, 'w') as f:
writer = csv.writer(f)
writer.writerow(['id', 'name', 'age', 'percent'])
for i in range(NUM):
writer.writerow([str(i), 'name is ' + str(i), i % 100, i / 100])
res = aux_profile(filename, fast)
for i in range(NUM):
assert res[i]['id'] == i
assert res[i]['name'] == 'name is ' + str(i)
assert res[i]['age'] == i % 100

0 comments on commit 82d30a9

Please sign in to comment.