Skip to content

Commit

Permalink
fix Flake8 complaints
Browse files Browse the repository at this point in the history
- Fix whitespace, indentation, unused variables, duplicate function
- Refactor to extract identical function definitions
  • Loading branch information
ThrawnCA committed Mar 4, 2025
1 parent 2864197 commit abb0545
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 52 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ max-line-length=127

# List ignore rules one per line.
ignore =
C901
E501
W503
8 changes: 4 additions & 4 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ def xloader_data_into_datastore_(input, job_dict, logger):
if hasattr(h, "datastore_rw_resource_url_types"):
datastore_rw_resource_url_types = h.datastore_rw_resource_url_types()
else:
#fallback for 2.10.x or older.
# fallback for 2.10.x or older.
datastore_rw_resource_url_types = ['datastore']

if resource.get('url_type') in datastore_rw_resource_url_types:
logger.info('Ignoring resource - R/W DataStore resources are '
'managed with the Datastore API')
Expand Down Expand Up @@ -267,7 +267,7 @@ def tabulator_load():
logger.warning('Load using COPY failed: %s', e)
logger.info('Trying again with tabulator')
tabulator_load()
except JobTimeoutException as e:
except JobTimeoutException:
try:
tmp_file.close()
except FileNotFoundError:
Expand Down Expand Up @@ -393,7 +393,7 @@ def _download_resource_data(resource, data, api_key, logger):
raise HTTPError(
message=err_message, status_code=None,
request_url=url, response=None)
except JobTimeoutException as e:
except JobTimeoutException:
tmp_file.close()
logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT)
raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT))
Expand Down
42 changes: 17 additions & 25 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
if f['id'] in existing_info:
f['info'] = existing_info[f['id']]
f['strip_extra_white'] = existing_info[f['id']].get('strip_extra_white') if 'strip_extra_white' in existing_info[f['id']] \
else existing_fields_by_headers[f['id']].get('strip_extra_white', True)
else existing_fields_by_headers[f['id']].get('strip_extra_white', True)

'''
Delete or truncate existing datastore table before proceeding,
Expand All @@ -222,39 +222,32 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
fields = [
{'id': header_name,
'type': 'text',
'strip_extra_white': True,}
'strip_extra_white': True}
for header_name in headers]

logger.info('Fields: %s', fields)

def _make_whitespace_stripping_iter(super_iter):
def strip_white_space_iter():
for row in super_iter():
if len(row) == len(fields):
for _index, _cell in enumerate(row):
# only strip white space if strip_extra_white is True
if fields[_index].get('strip_extra_white', True) and isinstance(_cell, str):
row[_index] = _cell.strip()
yield row
return strip_white_space_iter

save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter}
try:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result,
skip_rows=skip_rows) as stream:
super_iter = stream.iter
def strip_white_space_iter():
for row in super_iter():
if len(row) == len(fields):
for _index, _cell in enumerate(row):
# only strip white space if strip_extra_white is True
if fields[_index].get('strip_extra_white', True) and isinstance(_cell, str):
row[_index] = _cell.strip()
yield row
stream.iter = strip_white_space_iter
stream.iter = _make_whitespace_stripping_iter(stream.iter)
stream.save(**save_args)
except (EncodingError, UnicodeDecodeError):
with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING,
skip_rows=skip_rows) as stream:
super_iter = stream.iter
def strip_white_space_iter():
for row in super_iter():
if len(row) == len(fields):
for _index, _cell in enumerate(row):
# only strip white space if strip_extra_white is True
if fields[_index].get('strip_extra_white', True) and isinstance(_cell, str):
row[_index] = _cell.strip()
yield row
stream.iter = strip_white_space_iter
stream.iter = _make_whitespace_stripping_iter(stream.iter)
stream.save(**save_args)
csv_filepath = f_write.name

Expand Down Expand Up @@ -284,9 +277,8 @@ def strip_white_space_iter():
raise LoaderError('Could not create the database table: {}'
.format(e))


# datstore_active is switched on by datastore_create - TODO temporarily
# disable it until the load is complete
# datastore_active is switched on by datastore_create
# TODO temporarily disable it until the load is complete

with engine.begin() as conn:
_disable_fulltext_trigger(conn, resource_id)
Expand Down
6 changes: 3 additions & 3 deletions ckanext/xloader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ def receive_validation_report(self, validation_report):
res_dict = toolkit.get_action('resource_show')({'ignore_auth': True},
{'id': validation_report.get('resource_id')})
if (toolkit.asbool(toolkit.config.get('ckanext.xloader.validation.enforce_schema', True))
or res_dict.get('schema', None)) and validation_report.get('status') != 'success':
# A schema is present, or required to be present
return
or res_dict.get('schema', None)) and validation_report.get('status') != 'success':
# A schema is present, or required to be present
return
# if validation is running in async mode, it is running from the redis workers.
# thus we need to do sync=True to have Xloader put the job at the front of the queue.
sync = toolkit.asbool(toolkit.config.get(u'ckanext.validation.run_on_update_async', True))
Expand Down
3 changes: 3 additions & 0 deletions ckanext/xloader/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ def get_response(download_url, headers):
resp.headers = headers
return resp


def get_large_response(download_url, headers):
"""Mock jobs.get_response() method to fake a large file."""
resp = Response()
resp.raw = io.BytesIO(_TEST_FILE_CONTENT.encode())
resp.headers = {'content-length': 2000000000}
return resp


def get_large_data_response(download_url, headers):
"""Mock jobs.get_response() method."""
resp = Response()
Expand All @@ -42,6 +44,7 @@ def get_large_data_response(download_url, headers):
resp.headers = headers
return resp


def _get_temp_files(dir='/tmp'):
return [os.path.join(dir, f) for f in os.listdir(dir) if os.path.isfile(os.path.join(dir, f))]

Expand Down
17 changes: 0 additions & 17 deletions ckanext/xloader/tests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,23 +1177,6 @@ def test_simple_large_file(self, Session):
u"text",
]

def test_simple_large_file(self, Session):
csv_filepath = get_sample_filepath("simple-large.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)
assert self._get_column_types(Session, resource_id) == [
u"int4",
u"tsvector",
u"numeric",
u"text",
]

def test_with_mixed_types(self, Session):
csv_filepath = get_sample_filepath("mixed_numeric_string_sample.csv")
resource = factories.Resource()
Expand Down
6 changes: 3 additions & 3 deletions ckanext/xloader/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def test_require_validation(self, monkeypatch):

# TODO: test IPipeValidation
assert not func.called # because of the validation_status not being `success`
func.called = None # reset
func.called = None # reset

helpers.call_action(
"resource_update",
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_enforce_validation_schema(self, monkeypatch):

# TODO: test IPipeValidation
assert not func.called # because of the schema being empty
func.called = None # reset
func.called = None # reset

helpers.call_action(
"resource_update",
Expand All @@ -132,7 +132,7 @@ def test_enforce_validation_schema(self, monkeypatch):

# TODO: test IPipeValidation
assert not func.called # because of the validation_status not being `success` and there is a schema
func.called = None # reset
func.called = None # reset

helpers.call_action(
"resource_update",
Expand Down

0 comments on commit abb0545

Please sign in to comment.