diff --git a/dataflows/VERSION b/dataflows/VERSION index 7d85683..d1d899f 100644 --- a/dataflows/VERSION +++ b/dataflows/VERSION @@ -1 +1 @@ -0.5.4 +0.5.5 diff --git a/dataflows/base/datastream_processor.py b/dataflows/base/datastream_processor.py index 250e25d..80311cb 100644 --- a/dataflows/base/datastream_processor.py +++ b/dataflows/base/datastream_processor.py @@ -96,15 +96,18 @@ def raise_exception(self, cause): raise error from cause raise cause - def safe_process(self, on_error=None): + def safe_process(self, return_results=False, on_error=None): results = [] try: ds = self._process() for res in ds.res_iter: - if on_error is not None: - results.append(list( - schema_validator(res.res, res, on_error=on_error) - )) + if return_results: + if on_error is not None: + results.append(list( + schema_validator(res.res, res, on_error=on_error) + )) + else: + results.append(list(res)) else: collections.deque(res, maxlen=0) except UniqueKeyError as e: @@ -121,7 +124,5 @@ def process(self): return ds.dp, ds.merge_stats() def results(self, on_error=None): - if on_error is None: - on_error = raise_exception - ds, results = self.safe_process(on_error=on_error) + ds, results = self.safe_process(return_results=True, on_error=on_error) return results, ds.dp, ds.merge_stats() diff --git a/dataflows/base/flow.py b/dataflows/base/flow.py index 6318015..9ddf226 100644 --- a/dataflows/base/flow.py +++ b/dataflows/base/flow.py @@ -2,13 +2,14 @@ from collections.abc import Iterable from .datastream_processor import DataStreamProcessor +from .schema_validator import raise_exception class Flow: def __init__(self, *args): self.chain = args - def results(self, on_error=None): + def results(self, on_error=raise_exception): return self._chain().results(on_error=on_error) def process(self): diff --git a/dataflows/processors/load.py b/dataflows/processors/load.py index 7ace526..8cb3ad7 100644 --- a/dataflows/processors/load.py +++ b/dataflows/processors/load.py @@ -175,9 +175,10 @@ def safe_process_datapackage(self, dp: Package): descriptor['encoding'] = self.options['encoding'] self.options['custom_parsers'] = self.get_custom_parsers(self.options.get('custom_parsers')) self.options.setdefault('ignore_blank_headers', True) + if 'headers' not in self.options: + self.options.setdefault('skip_rows', [{'type': 'preset', 'value': 'auto'}]) self.options.setdefault('headers', 1) self.options.setdefault('sample_size', 1000) - self.options.setdefault('skip_rows', [{'type': 'preset', 'value': 'auto'}]) stream: Stream = Stream(self.load_source, **self.options).open() if len(stream.headers) != len(set(stream.headers)): if not self.deduplicate_headers: @@ -215,11 +216,16 @@ def safe_process_datapackage(self, dp: Package): return dp def stripper(self, iterator): + whitespace = set(' \t\n\r') for r in iterator: - yield dict( - (k, v.strip()) if isinstance(v, str) else (k, v) - for k, v in r.items() - ) + for k, v in r.items(): + if v and isinstance(v, str) and (v[-1] in whitespace or v[0] in whitespace): + r[k] = v.strip() + yield r + # yield dict( + # (k, v.strip()) if isinstance(v, str) else (k, v) + # for k, v in r.items() + # ) def limiter(self, iterator): count = 0 diff --git a/setup.py b/setup.py index 8565896..31be5bf 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ def read(*paths): INSTALL_REQUIRES = [ 'dataflows-tabulator>=1.54.0', 'datapackage>=1.15.4', - 'tableschema>=1.20.10', + 'tableschema>=1.20.11', 'kvfile>=1.1.1', 'click', 'jinja2', diff --git a/tests/test_lib.py b/tests/test_lib.py index 34599f9..2563b88 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -2523,3 +2523,32 @@ def test_dump_to_json_objects(): for x in ['a', 'b']: assert res[50][x] == data[50][x] assert res[50]['c'][x] == data[50][x] + + +def aux_profile(filename, fast=False): + from dataflows import Flow, load, schema_validator + return Flow( + load(filename, cast_strategy=load.CAST_WITH_SCHEMA), + ).results(on_error=None if fast else schema_validator.raise_exception)[0][0] + +@pytest.mark.parametrize('fast', [False, True]) +def test_profile(fast): + import csv + import tempfile + import os + from decimal import Decimal + + NUM = 1000000 + + with tempfile.TemporaryDirectory() as tmpdirname: + filename = os.path.join(tmpdirname, 'test.csv') + with open(filename, 'w') as f: + writer = csv.writer(f) + writer.writerow(['id', 'name', 'age', 'percent']) + for i in range(NUM): + writer.writerow([str(i), 'name is ' + str(i), i % 100, i / 100]) + res = aux_profile(filename, fast) + for i in range(NUM): + assert res[i]['id'] == i + assert res[i]['name'] == 'name is ' + str(i) + assert res[i]['age'] == i % 100