diff --git a/.flake8 b/.flake8 index 32068ca7..973dec8d 100644 --- a/.flake8 +++ b/.flake8 @@ -17,4 +17,6 @@ max-line-length=127 # List ignore rules one per line. ignore = + C901 + E501 W503 diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 75b31ce6..5013627a 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -182,9 +182,9 @@ def xloader_data_into_datastore_(input, job_dict, logger): if hasattr(h, "datastore_rw_resource_url_types"): datastore_rw_resource_url_types = h.datastore_rw_resource_url_types() else: - #fallback for 2.10.x or older. + # fallback for 2.10.x or older. datastore_rw_resource_url_types = ['datastore'] - + if resource.get('url_type') in datastore_rw_resource_url_types: logger.info('Ignoring resource - R/W DataStore resources are ' 'managed with the Datastore API') @@ -267,7 +267,7 @@ def tabulator_load(): logger.warning('Load using COPY failed: %s', e) logger.info('Trying again with tabulator') tabulator_load() - except JobTimeoutException as e: + except JobTimeoutException: try: tmp_file.close() except FileNotFoundError: @@ -393,7 +393,7 @@ def _download_resource_data(resource, data, api_key, logger): raise HTTPError( message=err_message, status_code=None, request_url=url, response=None) - except JobTimeoutException as e: + except JobTimeoutException: tmp_file.close() logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index bb9dda48..f45b579c 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -204,7 +204,7 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): if f['id'] in existing_info: f['info'] = existing_info[f['id']] f['strip_extra_white'] = existing_info[f['id']].get('strip_extra_white') if 'strip_extra_white' in existing_info[f['id']] \ - else existing_fields_by_headers[f['id']].get('strip_extra_white', True) + else existing_fields_by_headers[f['id']].get('strip_extra_white', True) ''' Delete or truncate existing datastore table before proceeding, @@ -222,39 +222,32 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): fields = [ {'id': header_name, 'type': 'text', - 'strip_extra_white': True,} + 'strip_extra_white': True} for header_name in headers] logger.info('Fields: %s', fields) + def _make_whitespace_stripping_iter(super_iter): + def strip_white_space_iter(): + for row in super_iter(): + if len(row) == len(fields): + for _index, _cell in enumerate(row): + # only strip white space if strip_extra_white is True + if fields[_index].get('strip_extra_white', True) and isinstance(_cell, str): + row[_index] = _cell.strip() + yield row + return strip_white_space_iter + save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter} try: with UnknownEncodingStream(csv_filepath, file_format, decoding_result, skip_rows=skip_rows) as stream: - super_iter = stream.iter - def strip_white_space_iter(): - for row in super_iter(): - if len(row) == len(fields): - for _index, _cell in enumerate(row): - # only strip white space if strip_extra_white is True - if fields[_index].get('strip_extra_white', True) and isinstance(_cell, str): - row[_index] = _cell.strip() - yield row - stream.iter = strip_white_space_iter + stream.iter = _make_whitespace_stripping_iter(stream.iter) stream.save(**save_args) except (EncodingError, UnicodeDecodeError): with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING, skip_rows=skip_rows) as stream: - super_iter = stream.iter - def strip_white_space_iter(): - for row in super_iter(): - if len(row) == len(fields): - for _index, _cell in enumerate(row): - # only strip white space if strip_extra_white is True - if fields[_index].get('strip_extra_white', True) and isinstance(_cell, str): - row[_index] = _cell.strip() - yield row - stream.iter = strip_white_space_iter + stream.iter = _make_whitespace_stripping_iter(stream.iter) stream.save(**save_args) csv_filepath = f_write.name @@ -284,9 +277,8 @@ def strip_white_space_iter(): raise LoaderError('Could not create the database table: {}' .format(e)) - - # datstore_active is switched on by datastore_create - TODO temporarily - # disable it until the load is complete + # datastore_active is switched on by datastore_create + # TODO temporarily disable it until the load is complete with engine.begin() as conn: _disable_fulltext_trigger(conn, resource_id) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 5c0084d9..75999127 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -91,9 +91,9 @@ def receive_validation_report(self, validation_report): res_dict = toolkit.get_action('resource_show')({'ignore_auth': True}, {'id': validation_report.get('resource_id')}) if (toolkit.asbool(toolkit.config.get('ckanext.xloader.validation.enforce_schema', True)) - or res_dict.get('schema', None)) and validation_report.get('status') != 'success': - # A schema is present, or required to be present - return + or res_dict.get('schema', None)) and validation_report.get('status') != 'success': + # A schema is present, or required to be present + return # if validation is running in async mode, it is running from the redis workers. # thus we need to do sync=True to have Xloader put the job at the front of the queue. sync = toolkit.asbool(toolkit.config.get(u'ckanext.validation.run_on_update_async', True)) diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index c67f4fe6..0105789d 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -27,6 +27,7 @@ def get_response(download_url, headers): resp.headers = headers return resp + def get_large_response(download_url, headers): """Mock jobs.get_response() method to fake a large file.""" resp = Response() @@ -34,6 +35,7 @@ def get_large_response(download_url, headers): resp.headers = {'content-length': 2000000000} return resp + def get_large_data_response(download_url, headers): """Mock jobs.get_response() method.""" resp = Response() @@ -42,6 +44,7 @@ def get_large_data_response(download_url, headers): resp.headers = headers return resp + def _get_temp_files(dir='/tmp'): return [os.path.join(dir, f) for f in os.listdir(dir) if os.path.isfile(os.path.join(dir, f))] diff --git a/ckanext/xloader/tests/test_loader.py b/ckanext/xloader/tests/test_loader.py index 7ff73446..09f87887 100644 --- a/ckanext/xloader/tests/test_loader.py +++ b/ckanext/xloader/tests/test_loader.py @@ -1177,23 +1177,6 @@ def test_simple_large_file(self, Session): u"text", ] - def test_simple_large_file(self, Session): - csv_filepath = get_sample_filepath("simple-large.csv") - resource = factories.Resource() - resource_id = resource['id'] - loader.load_table( - csv_filepath, - resource_id=resource_id, - mimetype="text/csv", - logger=logger, - ) - assert self._get_column_types(Session, resource_id) == [ - u"int4", - u"tsvector", - u"numeric", - u"text", - ] - def test_with_mixed_types(self, Session): csv_filepath = get_sample_filepath("mixed_numeric_string_sample.csv") resource = factories.Resource() diff --git a/ckanext/xloader/tests/test_plugin.py b/ckanext/xloader/tests/test_plugin.py index f22dafbd..f6a0590f 100644 --- a/ckanext/xloader/tests/test_plugin.py +++ b/ckanext/xloader/tests/test_plugin.py @@ -81,7 +81,7 @@ def test_require_validation(self, monkeypatch): # TODO: test IPipeValidation assert not func.called # because of the validation_status not being `success` - func.called = None # reset + func.called = None # reset helpers.call_action( "resource_update", @@ -118,7 +118,7 @@ def test_enforce_validation_schema(self, monkeypatch): # TODO: test IPipeValidation assert not func.called # because of the schema being empty - func.called = None # reset + func.called = None # reset helpers.call_action( "resource_update", @@ -132,7 +132,7 @@ def test_enforce_validation_schema(self, monkeypatch): # TODO: test IPipeValidation assert not func.called # because of the validation_status not being `success` and there is a schema - func.called = None # reset + func.called = None # reset helpers.call_action( "resource_update",