diff --git a/.gitignore b/.gitignore index 91d6462..70523d2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.pyc +*~ .coverage dist build diff --git a/README.rst b/README.rst index a38c2cd..6b75aae 100644 --- a/README.rst +++ b/README.rst @@ -2,6 +2,10 @@ csvvalidator ============ +**This package is no longer maintained. Functionality for validating tables has been migrated to [petl](https://petl.readthedocs.io/en/stable/transform.html#petl.transform.validation.validate).** + +--- + This module provides some simple utilities for validating data contained in CSV files, or other similar data sources. diff --git a/csvvalidator.py b/csvvalidator.py index 693e975..be66ec2 100644 --- a/csvvalidator.py +++ b/csvvalidator.py @@ -1,21 +1,21 @@ -""" +""" -This module provides some simple utilities for validating data contained in CSV +This module provides some simple utilities for validating data contained in CSV files, or other similar data sources. -Note that the `csvvalidator` module is intended to be used in combination with -the standard Python `csv` module. The `csvvalidator` module **will not** -validate the *syntax* of a CSV file. Rather, the `csvvalidator` module can be -used to validate any source of row-oriented data, such as is provided by a +Note that the `csvvalidator` module is intended to be used in combination with +the standard Python `csv` module. The `csvvalidator` module **will not** +validate the *syntax* of a CSV file. Rather, the `csvvalidator` module can be +used to validate any source of row-oriented data, such as is provided by a `csv.reader` object. -I.e., if you want to validate data from a CSV file, you have to first construct -a CSV reader using the standard Python `csv` module, specifying the appropriate -dialect, and then pass the CSV reader as the source of data to either the +I.e., if you want to validate data from a CSV file, you have to first construct +a CSV reader using the standard Python `csv` module, specifying the appropriate +dialect, and then pass the CSV reader as the source of data to either the `CSVValidator.validate` or the `CSVValidator.ivalidate` method. -The `CSVValidator` class is the foundation for all validator objects that are -capable of validating CSV data. +The `CSVValidator` class is the foundation for all validator objects that are +capable of validating CSV data. You can use the CSVValidator class to dynamically construct a validator, e.g.:: @@ -24,55 +24,55 @@ from csvvalidator import * field_names = ( - 'study_id', - 'patient_id', - 'gender', - 'age_years', + 'study_id', + 'patient_id', + 'gender', + 'age_years', 'age_months', 'date_inclusion' ) validator = CSVValidator(field_names) - + # basic header and record length checks validator.add_header_check('EX1', 'bad header') validator.add_record_length_check('EX2', 'unexpected record length') - + # some simple value checks - validator.add_value_check('study_id', int, + validator.add_value_check('study_id', int, 'EX3', 'study id must be an integer') - validator.add_value_check('patient_id', int, + validator.add_value_check('patient_id', int, 'EX4', 'patient id must be an integer') - validator.add_value_check('gender', enumeration('M', 'F'), + validator.add_value_check('gender', enumeration('M', 'F'), 'EX5', 'invalid gender') - validator.add_value_check('age_years', number_range_inclusive(0, 120, int), + validator.add_value_check('age_years', number_range_inclusive(0, 120, int), 'EX6', 'invalid age in years') validator.add_value_check('date_inclusion', datetime_string('%Y-%m-%d'), 'EX7', 'invalid date') - + # a more complicated record check def check_age_variables(r): age_years = int(r['age_years']) age_months = int(r['age_months']) - valid = (age_months >= age_years * 12 and + valid = (age_months >= age_years * 12 and age_months % age_years < 12) if not valid: raise RecordError('EX8', 'invalid age variables') validator.add_record_check(check_age_variables) - # validate the data and write problems to stdout + # validate the data and write problems to stdout data = csv.reader('/path/to/data.csv', delimiter='\t') problems = validator.validate(data) write_problems(problems, sys.stdout) -For more complex use cases you can also sub-class `CSVValidator` to define +For more complex use cases you can also sub-class `CSVValidator` to define re-usable validator classes for specific data sources. -The source code for this module lives at: +The source code for this module lives at: https://github.com/alimanfoo/csvvalidator -For a complete account of all of the functionality available from this module, +For a complete account of all of the functionality available from this module, see the example.py and tests.py modules in the source code repository. """ @@ -109,14 +109,14 @@ def check_age_variables(r): class RecordError(Exception): """Exception representing a validation problem in a record.""" - - + + def __init__(self, code=None, message=None, details=None): self.code = code self.message = message self.details = details - - + + def __str__(self): return repr((self.code, self.message, self.details)) @@ -127,15 +127,15 @@ def __repr__(self): class CSVValidator(object): """ - Instances of this class can be configured to run a variety of different + Instances of this class can be configured to run a variety of different types of validation check on a CSV-like data source. """ - + def __init__(self, field_names): """ - Instantiate a `CSVValidator`, supplying expected `field_names` as a + Instantiate a `CSVValidator`, supplying expected `field_names` as a sequence of strings. """ @@ -150,55 +150,55 @@ def __init__(self, field_names): self._unique_checks = [] self._skips = [] - - def add_header_check(self, - code=HEADER_CHECK_FAILED, + + def add_header_check(self, + code=HEADER_CHECK_FAILED, message=MESSAGES[HEADER_CHECK_FAILED]): """ - Add a header check, i.e., check whether the header record is consistent + Add a header check, i.e., check whether the header record is consistent with the expected field names. Arguments --------- - `code` - problem code to report if the header record is not valid, + `code` - problem code to report if the header record is not valid, defaults to `HEADER_CHECK_FAILED` `message` - problem message to report if a value is not valid """ - + t = code, message self._header_checks.append(t) - - + + def add_record_length_check(self, - code=RECORD_LENGTH_CHECK_FAILED, + code=RECORD_LENGTH_CHECK_FAILED, message=MESSAGES[RECORD_LENGTH_CHECK_FAILED], modulus=1): """ Add a record length check, i.e., check whether the length of a record is consistent with the number of expected fields. - + Arguments --------- - `code` - problem code to report if a record is not valid, defaults to + `code` - problem code to report if a record is not valid, defaults to `RECORD_LENGTH_CHECK_FAILED` `message` - problem message to report if a record is not valid - `modulus` - apply the check to every nth record, defaults to 1 (check + `modulus` - apply the check to every nth record, defaults to 1 (check every record) - + """ - + t = code, message, modulus self._record_length_checks.append(t) - - - def add_value_check(self, field_name, value_check, - code=VALUE_CHECK_FAILED, + + + def add_value_check(self, field_name, value_check, + code=VALUE_CHECK_FAILED, message=MESSAGES[VALUE_CHECK_FAILED], modulus=1): """ @@ -207,18 +207,18 @@ def add_value_check(self, field_name, value_check, Arguments --------- - `field_name` - the name of the field to attach the value check function + `field_name` - the name of the field to attach the value check function to - `value_check` - a function that accepts a single argument (a value) and + `value_check` - a function that accepts a single argument (a value) and raises a `ValueError` if the value is not valid - `code` - problem code to report if a value is not valid, defaults to + `code` - problem code to report if a value is not valid, defaults to `VALUE_CHECK_FAILED` `message` - problem message to report if a value is not valid - `modulus` - apply the check to every nth record, defaults to 1 (check + `modulus` - apply the check to every nth record, defaults to 1 (check every record) """ @@ -226,37 +226,37 @@ def add_value_check(self, field_name, value_check, # guard conditions assert field_name in self._field_names, 'unexpected field name: %s' % field_name assert callable(value_check), 'value check must be a callable function' - + t = field_name, value_check, code, message, modulus self._value_checks.append(t) - - + + def add_value_predicate(self, field_name, value_predicate, - code=VALUE_PREDICATE_FALSE, + code=VALUE_PREDICATE_FALSE, message=MESSAGES[VALUE_PREDICATE_FALSE], modulus=1): """ Add a value predicate function for the specified field. - + N.B., everything you can do with value predicates can also be done with value check functions, whether you use one or the other is a matter of style. - + Arguments --------- - `field_name` - the name of the field to attach the value predicate + `field_name` - the name of the field to attach the value predicate function to - `value_predicate` - a function that accepts a single argument (a value) + `value_predicate` - a function that accepts a single argument (a value) and returns False if the value is not valid - `code` - problem code to report if a value is not valid, defaults to + `code` - problem code to report if a value is not valid, defaults to `VALUE_PREDICATE_FALSE` `message` - problem message to report if a value is not valid - `modulus` - apply the check to every nth record, defaults to 1 (check + `modulus` - apply the check to every nth record, defaults to 1 (check every record) """ @@ -266,20 +266,20 @@ def add_value_predicate(self, field_name, value_predicate, t = field_name, value_predicate, code, message, modulus self._value_predicates.append(t) - - + + def add_record_check(self, record_check, modulus=1): """ Add a record check function. - + Arguments --------- `record_check` - a function that accepts a single argument (a record as - a dictionary of values indexed by field name) and raises a + a dictionary of values indexed by field name) and raises a `RecordError` if the record is not valid - `modulus` - apply the check to every nth record, defaults to 1 (check + `modulus` - apply the check to every nth record, defaults to 1 (check every record) """ @@ -288,15 +288,15 @@ def add_record_check(self, record_check, modulus=1): t = record_check, modulus self._record_checks.append(t) - - + + def add_record_predicate(self, record_predicate, - code=RECORD_PREDICATE_FALSE, + code=RECORD_PREDICATE_FALSE, message=MESSAGES[RECORD_PREDICATE_FALSE], modulus=1): """ Add a record predicate function. - + N.B., everything you can do with record predicates can also be done with record check functions, whether you use one or the other is a matter of style. @@ -304,16 +304,16 @@ def add_record_predicate(self, record_predicate, Arguments --------- - `record_predicate` - a function that accepts a single argument (a record + `record_predicate` - a function that accepts a single argument (a record as a dictionary of values indexed by field name) and returns False if the value is not valid - `code` - problem code to report if a record is not valid, defaults to + `code` - problem code to report if a record is not valid, defaults to `RECORD_PREDICATE_FALSE` `message` - problem message to report if a record is not valid - `modulus` - apply the check to every nth record, defaults to 1 (check + `modulus` - apply the check to every nth record, defaults to 1 (check every record) """ @@ -322,10 +322,10 @@ def add_record_predicate(self, record_predicate, t = record_predicate, code, message, modulus self._record_predicates.append(t) - - + + def add_unique_check(self, key, - code=UNIQUE_CHECK_FAILED, + code=UNIQUE_CHECK_FAILED, message=MESSAGES[UNIQUE_CHECK_FAILED]): """ Add a unique check on a single column or combination of columns. @@ -333,39 +333,39 @@ def add_unique_check(self, key, Arguments --------- - `key` - a single field name (string) specifying a field in which all - values are expected to be unique, or a sequence of field names (tuple - or list of strings) specifying a compound key + `key` - a single field name (string) specifying a field in which all + values are expected to be unique, or a sequence of field names (tuple + or list of strings) specifying a compound key - `code` - problem code to report if a record is not valid, defaults to + `code` - problem code to report if a record is not valid, defaults to `UNIQUE_CHECK_FAILED` `message` - problem message to report if a record is not valid """ - - if isinstance(key, basestring): + + if isinstance(key, basestring): assert key in self._field_names, 'unexpected field name: %s' % key else: for f in key: assert f in self._field_names, 'unexpected field name: %s' % key t = key, code, message self._unique_checks.append(t) - - + + def add_skip(self, skip): """ - Add a `skip` function which accepts a single argument (a record as a - sequence of values) and returns True if all checks on the record should + Add a `skip` function which accepts a single argument (a record as a + sequence of values) and returns True if all checks on the record should be skipped. - + """ assert callable(skip), 'skip must be a callable function' self._skips.append(skip) - - - def validate(self, data, + + + def validate(self, data, expect_header_row=True, ignore_lines=0, summarize=False, @@ -374,45 +374,45 @@ def validate(self, data, report_unexpected_exceptions=True): """ Validate `data` and return a list of validation problems found. - + Arguments --------- - `data` - any source of row-oriented data, e.g., as provided by a + `data` - any source of row-oriented data, e.g., as provided by a `csv.reader`, or a list of lists of strings, or ... - - `expect_header_row` - does the data contain a header row (i.e., the + + `expect_header_row` - does the data contain a header row (i.e., the first record is a list of field names)? Defaults to True. - + `ignore_lines` - ignore n lines (rows) at the beginning of the data - + `summarize` - only report problem codes, no other details - + `limit` - report at most n problems - + `context` - a dictionary of any additional information to be added to - any problems found - useful if problems are being aggregated from + any problems found - useful if problems are being aggregated from multiple validators - + `report_unexpected_exceptions` - value check function, value predicates, - record check functions, record predicates, and other user-supplied + record check functions, record predicates, and other user-supplied validation functions may raise unexpected exceptions. If this argument - is true, any unexpected exceptions will be reported as validation - problems; if False, unexpected exceptions will be handled silently. + is true, any unexpected exceptions will be reported as validation + problems; if False, unexpected exceptions will be handled silently. """ problems = list() - problem_generator = self.ivalidate(data, expect_header_row, - ignore_lines, summarize, context, + problem_generator = self.ivalidate(data, expect_header_row, + ignore_lines, summarize, context, report_unexpected_exceptions) for i, p in enumerate(problem_generator): if not limit or i < limit: problems.append(p) return problems - - - def ivalidate(self, data, + + + def ivalidate(self, data, expect_header_row=True, ignore_lines=0, summarize=False, @@ -423,32 +423,32 @@ def ivalidate(self, data, Use this function rather than validate() if you expect a large number of problems. - + Arguments --------- - `data` - any source of row-oriented data, e.g., as provided by a + `data` - any source of row-oriented data, e.g., as provided by a `csv.reader`, or a list of lists of strings, or ... - - `expect_header_row` - does the data contain a header row (i.e., the + + `expect_header_row` - does the data contain a header row (i.e., the first record is a list of field names)? Defaults to True. - + `ignore_lines` - ignore n lines (rows) at the beginning of the data - + `summarize` - only report problem codes, no other details - + `context` - a dictionary of any additional information to be added to - any problems found - useful if problems are being aggregated from + any problems found - useful if problems are being aggregated from multiple validators - + `report_unexpected_exceptions` - value check function, value predicates, - record check functions, record predicates, and other user-supplied + record check functions, record predicates, and other user-supplied validation functions may raise unexpected exceptions. If this argument - is true, any unexpected exceptions will be reported as validation - problems; if False, unexpected exceptions will be handled silently. - + is true, any unexpected exceptions will be reported as validation + problems; if False, unexpected exceptions will be handled silently. + """ - + unique_sets = self._init_unique_sets() # used for unique checks for i, r in enumerate(data): if expect_header_row and i == ignore_lines: @@ -458,7 +458,7 @@ def ivalidate(self, data, elif i >= ignore_lines: # r is a data row skip = False - for p in self._apply_skips(i, r, summarize, + for p in self._apply_skips(i, r, summarize, report_unexpected_exceptions, context): if p is True: @@ -466,61 +466,61 @@ def ivalidate(self, data, else: yield p if not skip: - for p in self._apply_each_methods(i, r, summarize, + for p in self._apply_each_methods(i, r, summarize, report_unexpected_exceptions, context): yield p # may yield a problem if an exception is raised - for p in self._apply_value_checks(i, r, summarize, + for p in self._apply_value_checks(i, r, summarize, report_unexpected_exceptions, context): yield p - for p in self._apply_record_length_checks(i, r, summarize, + for p in self._apply_record_length_checks(i, r, summarize, context): yield p - for p in self._apply_value_predicates(i, r, summarize, + for p in self._apply_value_predicates(i, r, summarize, report_unexpected_exceptions, context): yield p - for p in self._apply_record_checks(i, r, summarize, + for p in self._apply_record_checks(i, r, summarize, report_unexpected_exceptions, context): yield p - for p in self._apply_record_predicates(i, r, summarize, + for p in self._apply_record_predicates(i, r, summarize, report_unexpected_exceptions, context): yield p for p in self._apply_unique_checks(i, r, unique_sets, summarize): yield p - for p in self._apply_check_methods(i, r, summarize, + for p in self._apply_check_methods(i, r, summarize, report_unexpected_exceptions, context): yield p - for p in self._apply_assert_methods(i, r, summarize, + for p in self._apply_assert_methods(i, r, summarize, report_unexpected_exceptions, context): yield p - for p in self._apply_finally_assert_methods(summarize, + for p in self._apply_finally_assert_methods(summarize, report_unexpected_exceptions, context): yield p - - + + def _init_unique_sets(self): """Initialise sets used for uniqueness checking.""" - + ks = dict() for t in self._unique_checks: key = t[0] ks[key] = set() # empty set return ks - - - def _apply_value_checks(self, i, r, - summarize=False, + + + def _apply_value_checks(self, i, r, + summarize=False, report_unexpected_exceptions=True, context=None): """Apply value check functions on the given record `r`.""" - + for field_name, check, code, message, modulus in self._value_checks: if i % modulus == 0: # support sampling fi = self._field_names.index(field_name) @@ -550,15 +550,15 @@ def _apply_value_checks(self, i, r, p['value'] = value p['record'] = r p['exception'] = e - p['function'] = '%s: %s' % (check.__name__, + p['function'] = '%s: %s' % (check.__name__, check.__doc__) if context is not None: p['context'] = context yield p - - + + def _apply_header_checks(self, i, r, summarize=False, context=None): """Apply header checks on the given record `r`.""" - + for code, message in self._header_checks: if tuple(r) != self._field_names: p = {'code': code} @@ -567,14 +567,14 @@ def _apply_header_checks(self, i, r, summarize=False, context=None): p['row'] = i + 1 p['record'] = tuple(r) p['missing'] = set(self._field_names) - set(r) - p['unexpected'] = set(r) - set(self._field_names) + p['unexpected'] = set(r) - set(self._field_names) if context is not None: p['context'] = context yield p - - + + def _apply_record_length_checks(self, i, r, summarize=False, context=None): """Apply record length checks on the given record `r`.""" - + for code, message, modulus in self._record_length_checks: if i % modulus == 0: # support sampling if len(r) != len(self._field_names): @@ -586,14 +586,14 @@ def _apply_record_length_checks(self, i, r, summarize=False, context=None): p['length'] = len(r) if context is not None: p['context'] = context yield p - - - def _apply_value_predicates(self, i, r, - summarize=False, + + + def _apply_value_predicates(self, i, r, + summarize=False, report_unexpected_exceptions=True, context=None): """Apply value predicates on the given record `r`.""" - + for field_name, predicate, code, message, modulus in self._value_predicates: if i % modulus == 0: # support sampling fi = self._field_names.index(field_name) @@ -623,18 +623,18 @@ def _apply_value_predicates(self, i, r, p['value'] = value p['record'] = r p['exception'] = e - p['function'] = '%s: %s' % (predicate.__name__, + p['function'] = '%s: %s' % (predicate.__name__, predicate.__doc__) if context is not None: p['context'] = context yield p - def _apply_record_checks(self, i, r, - summarize=False, + def _apply_record_checks(self, i, r, + summarize=False, report_unexpected_exceptions=True, context=None): """Apply record checks on `r`.""" - + for check, modulus in self._record_checks: if i % modulus == 0: # support sampling rdict = self._as_dict(r) @@ -659,18 +659,18 @@ def _apply_record_checks(self, i, r, p['row'] = i + 1 p['record'] = r p['exception'] = e - p['function'] = '%s: %s' % (check.__name__, + p['function'] = '%s: %s' % (check.__name__, check.__doc__) if context is not None: p['context'] = context yield p - - - def _apply_record_predicates(self, i, r, - summarize=False, + + + def _apply_record_predicates(self, i, r, + summarize=False, report_unexpected_exceptions=True, context=None): """Apply record predicates on `r`.""" - + for predicate, code, message, modulus in self._record_predicates: if i % modulus == 0: # support sampling rdict = self._as_dict(r) @@ -692,27 +692,31 @@ def _apply_record_predicates(self, i, r, p['row'] = i + 1 p['record'] = r p['exception'] = e - p['function'] = '%s: %s' % (predicate.__name__, + p['function'] = '%s: %s' % (predicate.__name__, predicate.__doc__) if context is not None: p['context'] = context yield p - - - def _apply_unique_checks(self, i, r, unique_sets, + + + def _apply_unique_checks(self, i, r, unique_sets, summarize=False, context=None): """Apply unique checks on `r`.""" - + for key, code, message in self._unique_checks: value = None values = unique_sets[key] if isinstance(key, basestring): # assume key is a field name fi = self._field_names.index(key) + if fi >= len(r): + continue value = r[fi] else: # assume key is a list or tuple, i.e., compound key value = [] for f in key: fi = self._field_names.index(f) + if fi >= len(r): + break value.append(r[fi]) value = tuple(value) # enable hashing if value in values: @@ -728,12 +732,12 @@ def _apply_unique_checks(self, i, r, unique_sets, values.add(value) - def _apply_each_methods(self, i, r, - summarize=False, + def _apply_each_methods(self, i, r, + summarize=False, report_unexpected_exceptions=True, context=None): """Invoke 'each' methods on `r`.""" - + for a in dir(self): if a.startswith('each'): rdict = self._as_dict(r) @@ -748,18 +752,18 @@ def _apply_each_methods(self, i, r, p['row'] = i + 1 p['record'] = r p['exception'] = e - p['function'] = '%s: %s' % (f.__name__, + p['function'] = '%s: %s' % (f.__name__, f.__doc__) if context is not None: p['context'] = context yield p - - def _apply_assert_methods(self, i, r, - summarize=False, + + def _apply_assert_methods(self, i, r, + summarize=False, report_unexpected_exceptions=True, context=None): """Apply 'assert' methods on `r`.""" - + for a in dir(self): if a.startswith('assert'): rdict = self._as_dict(r) @@ -767,10 +771,19 @@ def _apply_assert_methods(self, i, r, try: f(rdict) except AssertionError as e: - code = e.args[0] if len(e.args) > 0 else ASSERT_CHECK_FAILED + code = ASSERT_CHECK_FAILED + message = MESSAGES[ASSERT_CHECK_FAILED] + if len(e.args) > 0: + custom = e.args[0] + if isinstance(custom, (list, tuple)): + if len(custom) > 0: + code = custom[0] + if len(custom) > 1: + message = custom[1] + else: + code = custom p = {'code': code} if not summarize: - message = e.args[1] if len(e.args) > 1 else MESSAGES[ASSERT_CHECK_FAILED] p['message'] = message p['row'] = i + 1 p['record'] = r @@ -784,18 +797,18 @@ def _apply_assert_methods(self, i, r, p['row'] = i + 1 p['record'] = r p['exception'] = e - p['function'] = '%s: %s' % (f.__name__, + p['function'] = '%s: %s' % (f.__name__, f.__doc__) if context is not None: p['context'] = context yield p - - - def _apply_check_methods(self, i, r, - summarize=False, + + + def _apply_check_methods(self, i, r, + summarize=False, report_unexpected_exceptions=True, context=None): """Apply 'check' methods on `r`.""" - + for a in dir(self): if a.startswith('check'): rdict = self._as_dict(r) @@ -821,28 +834,37 @@ def _apply_check_methods(self, i, r, p['row'] = i + 1 p['record'] = r p['exception'] = e - p['function'] = '%s: %s' % (f.__name__, + p['function'] = '%s: %s' % (f.__name__, f.__doc__) if context is not None: p['context'] = context yield p - - - def _apply_finally_assert_methods(self, - summarize=False, + + + def _apply_finally_assert_methods(self, + summarize=False, report_unexpected_exceptions=True, context=None): """Apply 'finally_assert' methods.""" - + for a in dir(self): if a.startswith('finally_assert'): f = getattr(self, a) try: f() except AssertionError as e: - code = e.args[0] if len(e.args) > 0 else FINALLY_ASSERT_CHECK_FAILED + code = ASSERT_CHECK_FAILED + message = MESSAGES[ASSERT_CHECK_FAILED] + if len(e.args) > 0: + custom = e.args[0] + if isinstance(custom, (list, tuple)): + if len(custom) > 0: + code = custom[0] + if len(custom) > 1: + message = custom[1] + else: + code = custom p = {'code': code} if not summarize: - message = e.args[1] if len(e.args) > 1 else MESSAGES[FINALLY_ASSERT_CHECK_FAILED] p['message'] = message if context is not None: p['context'] = context yield p @@ -852,18 +874,18 @@ def _apply_finally_assert_methods(self, if not summarize: p['message'] = MESSAGES[UNEXPECTED_EXCEPTION] % (e.__class__.__name__, e) p['exception'] = e - p['function'] = '%s: %s' % (f.__name__, + p['function'] = '%s: %s' % (f.__name__, f.__doc__) if context is not None: p['context'] = context yield p - - + + def _apply_skips(self, i, r, - summarize=False, + summarize=False, report_unexpected_exceptions=True, context=None): """Apply skip functions on `r`.""" - + for skip in self._skips: try: result = skip(r) @@ -877,12 +899,12 @@ def _apply_skips(self, i, r, p['row'] = i + 1 p['record'] = r p['exception'] = e - p['function'] = '%s: %s' % (skip.__name__, + p['function'] = '%s: %s' % (skip.__name__, skip.__doc__) if context is not None: p['context'] = context yield p - - + + def _as_dict(self, r): """Convert the record to a dictionary using field names as keys.""" @@ -890,45 +912,45 @@ def _as_dict(self, r): for i, f in enumerate(self._field_names): d[f] = r[i] if i < len(r) else None return d - - + + def enumeration(*args): """ Return a value check function which raises a value error if the value is not in a pre-defined enumeration of values. - + If you pass in a list, tuple or set as the single argument, it is assumed that the list/tuple/set defines the membership of the enumeration. - + If you pass in more than on argument, it is assumed the arguments themselves define the enumeration. - + """ - + assert len(args) > 0, 'at least one argument is required' if len(args) == 1: # assume the first argument defines the membership - members = args[0] + members = args[0] else: # assume the arguments are the members - members = args + members = args def checker(value): if value not in members: raise ValueError(value) - return checker - - + return checker + + def match_pattern(regex): """ Return a value check function which raises a ValueError if the value does not match the supplied regular expression, see also `re.match`. - + """ - + prog = re.compile(regex) def checker(v): result = prog.match(v) - if result is None: + if result is None: raise ValueError(v) return checker @@ -936,26 +958,26 @@ def checker(v): def search_pattern(regex): """ Return a value check function which raises a ValueError if the supplied - regular expression does not match anywhere in the value, see also + regular expression does not match anywhere in the value, see also `re.search`. - + """ - + prog = re.compile(regex) def checker(v): result = prog.search(v) - if result is None: + if result is None: raise ValueError(v) return checker def number_range_inclusive(min, max, type=float): """ - Return a value check function which raises a ValueError if the supplied + Return a value check function which raises a ValueError if the supplied value when cast as `type` is less than `min` or greater than `max`. - + """ - + def checker(v): if type(v) < min or type(v) > max: raise ValueError(v) @@ -964,12 +986,12 @@ def checker(v): def number_range_exclusive(min, max, type=float): """ - Return a value check function which raises a ValueError if the supplied - value when cast as `type` is less than or equal to `min` or greater than - or equal to `max`. - + Return a value check function which raises a ValueError if the supplied + value when cast as `type` is less than or equal to `min` or greater than + or equal to `max`. + """ - + def checker(v): if type(v) <= min or type(v) >= max: raise ValueError(v) @@ -978,13 +1000,13 @@ def checker(v): def datetime_string(format): """ - Return a value check function which raises a ValueError if the supplied + Return a value check function which raises a ValueError if the supplied value cannot be converted to a datetime using the supplied format string. - + See also `datetime.strptime`. - + """ - + def checker(v): datetime.strptime(v, format) return checker @@ -992,10 +1014,10 @@ def checker(v): def datetime_range_inclusive(min, max, format): """ - Return a value check function which raises a ValueError if the supplied - value when converted to a datetime using the supplied `format` string is + Return a value check function which raises a ValueError if the supplied + value when converted to a datetime using the supplied `format` string is less than `min` or greater than `max`. - + """ dmin = datetime.strptime(min, format) @@ -1009,10 +1031,10 @@ def checker(v): def datetime_range_exclusive(min, max, format): """ - Return a value check function which raises a ValueError if the supplied - value when converted to a datetime using the supplied `format` string is + Return a value check function which raises a ValueError if the supplied + value when converted to a datetime using the supplied `format` string is less than or equal to `min` or greater than or equal to `max`. - + """ dmin = datetime.strptime(min, format) @@ -1027,7 +1049,7 @@ def checker(v): def write_problems(problems, file, summarize=False, limit=0): """ Write problems as restructured text to a file (or stdout/stderr). - + """ w = file.write # convenience variable w(""" @@ -1047,9 +1069,9 @@ def write_problems(problems, file, summarize=False, limit=0): """) total += 1 code = p['code'] - if code in counts: + if code in counts: counts[code] += 1 - else: + else: counts[code] = 1 if not summarize: ptitle = '\n%s - %s\n' % (p['code'], p['message']) @@ -1059,13 +1081,13 @@ def write_problems(problems, file, summarize=False, limit=0): underline += '-' underline += '\n' w(underline) - for k in sorted(p.viewkeys() - {'code', 'message', 'context'}): + for k in sorted(p.viewkeys() - set(['code', 'message', 'context'])): w(':%s: %s\n' % (k, p[k])) if 'context' in p: c = p['context'] for k in sorted(c.viewkeys()): w(':%s: %s\n' % (k, c[k])) - + w(""" Summary ======= @@ -1076,4 +1098,4 @@ def write_problems(problems, file, summarize=False, limit=0): for code in sorted(counts.viewkeys()): w(':%s: %s\n' % (code, counts[code])) return total - + diff --git a/setup.py b/setup.py index 062e4f0..5f78bac 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from distutils.core import setup setup(name='csvvalidator', - version='1.2-SNAPSHOT', + version='1.3-SNAPSHOT', author='Alistair Miles', author_email='alimanfoo@googlemail.com', url='https://github.com/alimanfoo/csvvalidator', diff --git a/tests.py b/tests.py index 540c04d..f62f406 100644 --- a/tests.py +++ b/tests.py @@ -34,7 +34,7 @@ def test_value_checks(): validator = CSVValidator(field_names) validator.add_value_check('foo', int) validator.add_value_check('bar', float) - + # some test data data = ( ('foo', 'bar'), # row 1 - header row @@ -46,27 +46,27 @@ def test_value_checks(): ('12', ''), # row 7 - bar invalid (empty) ('abc', 'def') # row 8 - both invalid ) - + # run the validator on the test data problems = validator.validate(data) - + assert len(problems) == 7 - + # N.B., expect row and column indices start from 1 - - problems_row2 = [p for p in problems if p['row'] == 2] + + problems_row2 = [p for p in problems if p['row'] == 2] assert len(problems_row2) == 0 # should be valid - + problems_row3 = [p for p in problems if p['row'] == 3] assert len(problems_row3) == 1 - p = problems_row3[0] # convenience variable + p = problems_row3[0] # convenience variable assert p['column'] == 1 # report column index assert p['field'] == 'foo' # report field name assert p['code'] == VALUE_CHECK_FAILED # default problem code for value checks assert p['message'] == MESSAGES[VALUE_CHECK_FAILED] # default message assert p['value'] == '1.2' # report bad value assert p['record'] == ('1.2', '3.4') # report record - + problems_row4 = [p for p in problems if p['row'] == 4] assert len(problems_row4) == 1 p = problems_row4[0] # convenience variable @@ -76,7 +76,7 @@ def test_value_checks(): assert p['message'] == MESSAGES[VALUE_CHECK_FAILED] assert p['value'] == 'abc' assert p['record'] == ('abc', '3.4') - + problems_row5 = [p for p in problems if p['row'] == 5] assert len(problems_row5) == 1 p = problems_row5[0] # convenience variable @@ -85,8 +85,8 @@ def test_value_checks(): assert p['code'] == VALUE_CHECK_FAILED assert p['message'] == MESSAGES[VALUE_CHECK_FAILED] assert p['value'] == 'abc' - assert p['record'] == ('12', 'abc') - + assert p['record'] == ('12', 'abc') + problems_row6 = [p for p in problems if p['row'] == 6] assert len(problems_row6) == 1 p = problems_row6[0] # convenience variable @@ -96,7 +96,7 @@ def test_value_checks(): assert p['message'] == MESSAGES[VALUE_CHECK_FAILED] assert p['value'] == '' assert p['record'] == ('', '3.4') - + problems_row7 = [p for p in problems if p['row'] == 7] assert len(problems_row7) == 1 p = problems_row7[0] # convenience variable @@ -105,8 +105,8 @@ def test_value_checks(): assert p['code'] == VALUE_CHECK_FAILED assert p['message'] == MESSAGES[VALUE_CHECK_FAILED] assert p['value'] == '' - assert p['record'] == ('12', '') - + assert p['record'] == ('12', '') + problems_row8 = [p for p in problems if p['row'] == 8] assert len(problems_row8) == 2 # expect both problems are found p0 = problems_row8[0] # convenience variable @@ -123,21 +123,21 @@ def test_value_checks(): assert p1['message'] == MESSAGES[VALUE_CHECK_FAILED] assert p1['value'] == 'def' assert p1['record'] == ('abc', 'def') - - + + def test_header_check(): """Test the header checks work.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) validator.add_header_check() # use default code and message validator.add_header_check(code='X1', message='custom message') # provide custom code and message - + data = ( ('foo', 'baz'), ('123', '456') ) - + problems = validator.validate(data) assert len(problems) == 2 @@ -145,25 +145,25 @@ def test_header_check(): assert p0['code'] == HEADER_CHECK_FAILED assert p0['message'] == MESSAGES[HEADER_CHECK_FAILED] assert p0['record'] == ('foo', 'baz') - assert p0['missing'] == {'bar'} - assert p0['unexpected'] == {'baz'} + assert p0['missing'] == set(['bar']) + assert p0['unexpected'] == set(['baz']) assert p0['row'] == 1 p1 = problems[1] assert p1['code'] == 'X1' assert p1['message'] == 'custom message' - assert p1['missing'] == {'bar'} - assert p1['unexpected'] == {'baz'} + assert p1['missing'] == set(['bar']) + assert p1['unexpected'] == set(['baz']) assert p1['record'] == ('foo', 'baz') assert p1['row'] == 1 - - + + def test_ignore_lines(): """Test instructions to ignore lines works.""" field_names = ('foo', 'bar') validator = CSVValidator(field_names) - validator.add_header_check() + validator.add_header_check() validator.add_value_check('foo', int) validator.add_value_check('bar', float) @@ -173,7 +173,7 @@ def test_ignore_lines(): ('foo', 'baz'), ('1.2', 'abc') ) - + problems = validator.validate(data, ignore_lines=2) assert len(problems) == 3 @@ -185,23 +185,23 @@ def test_ignore_lines(): assert len(value_problems) == 2 for p in value_problems: assert p['row'] == 4 - + def test_record_length_checks(): """Test the record length checks.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) validator.add_record_length_check() # test default code and message validator.add_record_length_check('X2', 'custom message') - + data = ( ('foo', 'bar'), ('12', '3.4'), ('12',), # be careful with syntax for singleton tuples ('12', '3.4', 'spong') ) - + problems = validator.validate(data) assert len(problems) == 4, len(problems) @@ -218,7 +218,7 @@ def test_record_length_checks(): assert d1['row'] == 4 assert d1['record'] == ('12', '3.4', 'spong') assert d1['length'] == 3 - + # find problems reported under custom code custom_problems = [p for p in problems if p['code'] == 'X2'] assert len(custom_problems) == 2 @@ -232,51 +232,51 @@ def test_record_length_checks(): assert c1['row'] == 4 assert c1['record'] == ('12', '3.4', 'spong') assert c1['length'] == 3 - - + + def test_value_checks_with_missing_values(): """ Establish expected behaviour for value checks where there are missing values in the records. - + """ - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) validator.add_value_check('bar', float) - + data = ( ('foo', 'bar'), ('12',) # this is missing value for bar, what happens to value check? ) - + problems = validator.validate(data) - + # missing values are ignored - use record length checks to find these assert len(problems) == 0 - - + + def test_value_check_enumeration(): """Test value checks with the enumeration() function.""" - + field_names = ('foo', 'bar', 'baz') validator = CSVValidator(field_names) # define an enumeration directly with arguments - validator.add_value_check('bar', enumeration('M', 'F')) + validator.add_value_check('bar', enumeration('M', 'F')) # define an enumeration by passing in a list or tuple flavours = ('chocolate', 'vanilla', 'strawberry') - validator.add_value_check('baz', enumeration(flavours)) - + validator.add_value_check('baz', enumeration(flavours)) + data = ( ('foo', 'bar', 'baz'), ('1', 'M', 'chocolate'), ('2', 'F', 'maple pecan'), ('3', 'X', 'strawberry') ) - + problems = validator.validate(data) assert len(problems) == 2 - + p0 = problems[0] assert p0['code'] == VALUE_CHECK_FAILED assert p0['row'] == 3 @@ -284,7 +284,7 @@ def test_value_check_enumeration(): assert p0['field'] == 'baz' assert p0['value'] == 'maple pecan' assert p0['record'] == ('2', 'F', 'maple pecan') - + p1 = problems[1] assert p1['code'] == VALUE_CHECK_FAILED assert p1['row'] == 4 @@ -292,15 +292,15 @@ def test_value_check_enumeration(): assert p1['field'] == 'bar' assert p1['value'] == 'X' assert p1['record'] == ('3', 'X', 'strawberry') - - + + def test_value_check_match_pattern(): """Test value checks with the match_pattern() function.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) validator.add_value_check('bar', match_pattern('\d{4}-\d{2}-\d{2}')) - + data = ( ('foo', 'bar'), ('1', '1999-01-01'), @@ -308,7 +308,7 @@ def test_value_check_match_pattern(): ('3', 'a1999-01-01'), ('4', '1999-01-01a') # this is valid - pattern attempts to match at beginning of line ) - + problems = validator.validate(data) assert len(problems) == 2, len(problems) for p in problems: @@ -316,15 +316,15 @@ def test_value_check_match_pattern(): assert problems[0]['row'] == 3 assert problems[1]['row'] == 4 - - + + def test_value_check_search_pattern(): """Test value checks with the search_pattern() function.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) validator.add_value_check('bar', search_pattern('\d{4}-\d{2}-\d{2}')) - + data = ( ('foo', 'bar'), ('1', '1999-01-01'), @@ -332,7 +332,7 @@ def test_value_check_search_pattern(): ('3', 'a1999-01-01'), # this is valid - pattern attempts to match anywhere in line ('4', '1999-01-01a') # this is valid - pattern attempts to match anywhere in line ) - + problems = validator.validate(data) assert len(problems) == 1, len(problems) assert problems[0]['code'] == VALUE_CHECK_FAILED @@ -341,14 +341,14 @@ def test_value_check_search_pattern(): def test_value_check_numeric_ranges(): """Test value checks with numerical range functions.""" - + field_names = ('foo', 'bar', 'baz', 'quux') validator = CSVValidator(field_names) validator.add_value_check('foo', number_range_inclusive(2, 6, int)) validator.add_value_check('bar', number_range_exclusive(2, 6, int)) - validator.add_value_check('baz', number_range_inclusive(2.0, 6.3, float)) - validator.add_value_check('quux', number_range_exclusive(2.0, 6.3, float)) - + validator.add_value_check('baz', number_range_inclusive(2.0, 6.3, float)) + validator.add_value_check('quux', number_range_exclusive(2.0, 6.3, float)) + data = ( ('foo', 'bar', 'baz', 'quux'), ('2', '3', '2.0', '2.1'), # valid @@ -357,7 +357,7 @@ def test_value_check_numeric_ranges(): ('2', '3', '1.9', '2.1'), # baz invalid ('2', '3', '2.0', '2.0') # quux invalid ) - + problems = validator.validate(data) assert len(problems) == 4, len(problems) for p in problems: @@ -367,15 +367,15 @@ def test_value_check_numeric_ranges(): assert problems[1]['row'] == 4 and problems[1]['field'] == 'bar' assert problems[2]['row'] == 5 and problems[2]['field'] == 'baz' assert problems[3]['row'] == 6 and problems[3]['field'] == 'quux' - - + + def test_value_checks_datetime(): """Test value checks with datetimes.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) validator.add_value_check('bar', datetime_string('%Y-%m-%d')) - + data = ( ('foo', 'bar'), ('A', '1999-09-09'), # valid @@ -383,7 +383,7 @@ def test_value_checks_datetime(): ('C', '1999-09-32'), # invalid day ('D', '1999-09-09ss') # invalid string ) - + problems = validator.validate(data) assert len(problems) == 3, problems for p in problems: @@ -392,20 +392,20 @@ def test_value_checks_datetime(): assert problems[0]['row'] == 3 and problems[0]['field'] == 'bar' assert problems[1]['row'] == 4 and problems[1]['field'] == 'bar' assert problems[2]['row'] == 5 and problems[2]['field'] == 'bar' - - + + def test_value_checks_datetime_range(): """Test value checks with datetime ranges.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) - validator.add_value_check('bar', datetime_range_inclusive('1999-09-09', - '2009-09-09', + validator.add_value_check('bar', datetime_range_inclusive('1999-09-09', + '2009-09-09', '%Y-%m-%d')) - validator.add_value_check('bar', datetime_range_exclusive('1999-09-09', - '2009-09-09', + validator.add_value_check('bar', datetime_range_exclusive('1999-09-09', + '2009-09-09', '%Y-%m-%d')) - + data = ( ('foo', 'bar'), ('A', '1999-09-10'), # valid @@ -414,102 +414,102 @@ def test_value_checks_datetime_range(): ('D', '1999-09-08'), # invalid (both) ('E', '2009-09-10') # invalid (both) ) - + problems = validator.validate(data) - + assert len(problems) == 6, len(problems) assert len([p for p in problems if p['row'] == 3]) == 1 assert len([p for p in problems if p['row'] == 4]) == 1 assert len([p for p in problems if p['row'] == 5]) == 2 assert len([p for p in problems if p['row'] == 6]) == 2 - - + + def test_value_predicates(): """Test the use of value predicates.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) - foo_predicate = lambda v: math.pow(float(v), 2) < 64 + foo_predicate = lambda v: math.pow(float(v), 2) < 64 validator.add_value_predicate('foo', foo_predicate) bar_predicate = lambda v: math.sqrt(float(v)) > 8 validator.add_value_predicate('bar', bar_predicate, 'X3', 'custom message') - + data = ( ('foo', 'bar'), ('4', '81'), # valid ('9', '81'), # foo invalid ('4', '49') # bar invalid ) - + problems = validator.validate(data) assert len(problems) == 2, len(problems) - + p0 = problems[0] assert p0['code'] == VALUE_PREDICATE_FALSE - assert p0['message'] == MESSAGES[VALUE_PREDICATE_FALSE] + assert p0['message'] == MESSAGES[VALUE_PREDICATE_FALSE] assert p0['row'] == 3 assert p0['column'] == 1 assert p0['field'] == 'foo' assert p0['value'] == '9' assert p0['record'] == ('9', '81') - + p1 = problems[1] assert p1['code'] == 'X3' - assert p1['message'] == 'custom message' + assert p1['message'] == 'custom message' assert p1['row'] == 4 assert p1['column'] == 2 assert p1['field'] == 'bar' assert p1['value'] == '49' assert p1['record'] == ('4', '49') - - + + def test_record_checks(): """Test the use of record checks.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) - + def foo_gt_bar(r): foo = int(r['foo']) bar = int(r['bar']) if foo < bar: raise RecordError validator.add_record_check(foo_gt_bar) # use default code and message - + def foo_gt_2bar(r): foo = int(r['foo']) bar = int(r['bar']) if foo < 2 * bar: raise RecordError('X4', 'custom message') validator.add_record_check(foo_gt_2bar) - + data = ( ('foo', 'bar'), ('7', '3'), # valid ('5', '3'), # invalid - not foo_gt_2bar ('1', '3') # invalid - both predicates false ) - + problems = validator.validate(data) n = len(problems) assert n == 3, n - + row3_problems = [p for p in problems if p['row'] == 3] assert len(row3_problems) == 1 p = row3_problems[0] assert p['code'] == 'X4' assert p['message'] == 'custom message' assert p['record'] == ('5', '3') - + row4_problems = [p for p in problems if p['row'] == 4] assert len(row4_problems) == 2 - + row4_problems_default = [p for p in row4_problems if p['code'] == RECORD_CHECK_FAILED] assert len(row4_problems_default) == 1 p = row4_problems_default[0] assert p['message'] == MESSAGES[RECORD_CHECK_FAILED] assert p['record'] == ('1', '3') - + row4_problems_custom = [p for p in row4_problems if p['code'] == 'X4'] assert len(row4_problems_custom) == 1 p = row4_problems_custom[0] @@ -519,45 +519,45 @@ def foo_gt_2bar(r): def test_record_predicates(): """Test the use of record predicates.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) - + def foo_gt_bar(r): return int(r['foo']) > int(r['bar']) # expect record will be a dictionary validator.add_record_predicate(foo_gt_bar) # use default code and message - + def foo_gt_2bar(r): return int(r['foo']) > 2 * int(r['bar']) validator.add_record_predicate(foo_gt_2bar, 'X4', 'custom message') - + data = ( ('foo', 'bar'), ('7', '3'), # valid ('5', '3'), # invalid - not foo_gt_2bar ('1', '3') # invalid - both predicates false ) - + problems = validator.validate(data) n = len(problems) assert n == 3, n - + row3_problems = [p for p in problems if p['row'] == 3] assert len(row3_problems) == 1 p = row3_problems[0] assert p['code'] == 'X4' assert p['message'] == 'custom message' assert p['record'] == ('5', '3') - + row4_problems = [p for p in problems if p['row'] == 4] assert len(row4_problems) == 2 - + row4_problems_default = [p for p in row4_problems if p['code'] == RECORD_PREDICATE_FALSE] assert len(row4_problems_default) == 1 p = row4_problems_default[0] assert p['message'] == MESSAGES[RECORD_PREDICATE_FALSE] assert p['record'] == ('1', '3') - + row4_problems_custom = [p for p in row4_problems if p['code'] == 'X4'] assert len(row4_problems_custom) == 1 p = row4_problems_custom[0] @@ -566,23 +566,23 @@ def foo_gt_2bar(r): def test_unique_checks(): - """Test the uniqueness checks.""" + """Test the uniqueness checks.""" field_names = ('foo', 'bar') validator = CSVValidator(field_names) validator.add_unique_check('foo') - + data = ( ('foo', 'bar'), ('1', 'A'), - ('2', 'B'), + ('2', 'B'), ('1', 'C') ) - + problems = validator.validate(data) n = len(problems) assert n == 1, n - + p = problems[0] assert p['code'] == UNIQUE_CHECK_FAILED assert p['message'] == MESSAGES[UNIQUE_CHECK_FAILED] @@ -590,28 +590,53 @@ def test_unique_checks(): assert p['key'] == 'foo' assert p['value'] == '1' assert p['record'] == ('1', 'C') - - + +def test_unique_checks_with_variable_record_lengths(): + """Test the uniqueness checks still work when record lengths vary.""" + + field_names = ('foo', 'bar') + validator = CSVValidator(field_names) + validator.add_unique_check('bar') + + data = ( + ('foo', 'bar'), + ('1', 'A'), + ('2'), + ('3', 'A') + ) + + problems = validator.validate(data) + n = len(problems) + assert n == 1, n + + p = problems[0] + assert p['code'] == UNIQUE_CHECK_FAILED + assert p['message'] == MESSAGES[UNIQUE_CHECK_FAILED] + assert p['row'] == 4 + assert p['key'] == 'bar' + assert p['value'] == 'A' + assert p['record'] == ('3', 'A') + def test_compound_unique_checks(): - """Test the uniqueness checks on compound keys.""" + """Test the uniqueness checks on compound keys.""" field_names = ('foo', 'bar') validator = CSVValidator(field_names) validator.add_unique_check(('foo', 'bar'), 'X5', 'custom message') - + data = ( ('foo', 'bar'), ('1', 'A'), - ('2', 'B'), + ('2', 'B'), ('1', 'B'), ('2', 'A'), ('1', 'A') ) - + problems = validator.validate(data) n = len(problems) assert n == 1, n - + p = problems[0] assert p['code'] == 'X5' assert p['message'] == 'custom message' @@ -619,100 +644,130 @@ def test_compound_unique_checks(): assert p['key'] == ('foo', 'bar') assert p['value'] == ('1', 'A') assert p['record'] == ('1', 'A') - - + + +def test_compound_unique_checks_with_variable_record_lengths(): + """Test the uniqueness checks on compound keys when record lengths vary.""" + + field_names = ('something', 'foo', 'bar') + validator = CSVValidator(field_names) + validator.add_unique_check(('foo', 'bar'), 'X5', 'custom message') + + data = ( + ('something', 'foo', 'bar'), + ('Z', '1', 'A'), + ('Z', '2', 'B'), + ('Z'), + ('Z', '2', 'A'), + ('Z', '1', 'A') + ) + + problems = validator.validate(data) + print problems + n = len(problems) + assert n == 1, n + + p = problems[0] + assert p['code'] == 'X5' + assert p['message'] == 'custom message' + assert p['row'] == 6 + assert p['key'] == ('foo', 'bar') + assert p['value'] == ('1', 'A') + assert p['record'] == ('Z', '1', 'A') + + def test_assert_methods(): """Test use of 'assert' methods.""" - - # define a custom validator class + + # define a custom validator class class MyValidator(CSVValidator): - + def __init__(self, threshold): field_names = ('foo', 'bar') super(MyValidator, self).__init__(field_names) self._threshold = threshold - + def assert_foo_plus_bar_gt_threshold(self, r): assert int(r['foo']) + int(r['bar']) > self._threshold # use default error code and message - + def assert_foo_times_bar_gt_threshold(self, r): assert int(r['foo']) * int(r['bar']) > self._threshold, ('X6', 'custom message') - + validator = MyValidator(42) - + data = ( ('foo', 'bar'), ('33', '10'), # valid ('7', '8'), # invalid (foo + bar less than threshold) - ('3', '4'), # invalid (both) + ('3', '4'), # invalid (both) ) - + problems = validator.validate(data) n = len(problems) assert n == 3, n - + row3_problems = [p for p in problems if p['row'] == 3] assert len(row3_problems) == 1 p = row3_problems[0] assert p['code'] == ASSERT_CHECK_FAILED assert p['message'] == MESSAGES[ASSERT_CHECK_FAILED] assert p['record'] == ('7', '8') - + row4_problems = [p for p in problems if p['row'] == 4] assert len(row4_problems) == 2 row4_problems_custom = [p for p in row4_problems if p['code'] == 'X6'] - assert len(row4_problems_custom) == 1 + assert len(row4_problems_custom) == 1, row4_problems p = row4_problems_custom[0] assert p['message'] == 'custom message' assert p['record'] == ('3', '4') - + row4_problems_default = [p for p in row4_problems if p['code'] == ASSERT_CHECK_FAILED] assert len(row4_problems_default) == 1 p = row4_problems_default[0] assert p['message'] == MESSAGES[ASSERT_CHECK_FAILED] assert p['record'] == ('3', '4') - + def test_check_methods(): """Test use of 'check' methods.""" - - # define a custom validator class + + # define a custom validator class class MyValidator(CSVValidator): - + def __init__(self, threshold): field_names = ('foo', 'bar') super(MyValidator, self).__init__(field_names) self._threshold = threshold - + def check_foo_plus_bar_gt_threshold(self, r): if int(r['foo']) + int(r['bar']) <= self._threshold: raise RecordError # use default error code and message - + def check_foo_times_bar_gt_threshold(self, r): if int(r['foo']) * int(r['bar']) <= self._threshold: raise RecordError('X6', 'custom message') - + validator = MyValidator(42) - + data = ( ('foo', 'bar'), ('33', '10'), # valid ('7', '8'), # invalid (foo + bar less than threshold) - ('3', '4'), # invalid (both) + ('3', '4'), # invalid (both) ) - + problems = validator.validate(data) n = len(problems) assert n == 3, n - + row3_problems = [p for p in problems if p['row'] == 3] assert len(row3_problems) == 1 p = row3_problems[0] assert p['code'] == RECORD_CHECK_FAILED assert p['message'] == MESSAGES[RECORD_CHECK_FAILED] assert p['record'] == ('7', '8') - + row4_problems = [p for p in problems if p['row'] == 4] assert len(row4_problems) == 2 @@ -721,50 +776,50 @@ def check_foo_times_bar_gt_threshold(self, r): p = row4_problems_custom[0] assert p['message'] == 'custom message' assert p['record'] == ('3', '4') - + row4_problems_default = [p for p in row4_problems if p['code'] == RECORD_CHECK_FAILED] assert len(row4_problems_default) == 1 p = row4_problems_default[0] assert p['message'] == MESSAGES[RECORD_CHECK_FAILED] assert p['record'] == ('3', '4') - + def test_each_and_finally_assert_methods(): """Test 'each' and 'finally_assert' methods.""" - - # define a custom validator class + + # define a custom validator class class MyValidator(CSVValidator): - + def __init__(self, threshold): field_names = ('foo', 'bar') super(MyValidator, self).__init__(field_names) self._threshold = threshold self._bars = [] self._count = 0 - + def each_store_bar(self, r): n = float(r['bar']) self._bars.append(n) self._count += 1 - + def finally_assert_mean_bar_gt_threshold(self): mean = sum(self._bars) / self._count assert mean > self._threshold, ('X7', 'custom message') - + data = [ ['foo', 'bar'], ['A', '2'], ['B', '3'], ['C', '7'] ] - + validator = MyValidator(5.0) problems = validator.validate(data) assert len(problems) == 1 p = problems[0] assert p['code'] == 'X7' assert p['message'] == 'custom message' - + data.append(['D', '10']) validator = MyValidator(5.0) problems = validator.validate(data) @@ -773,62 +828,62 @@ def finally_assert_mean_bar_gt_threshold(self): def test_exception_handling(): """Establish expectations for exception handling.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) - + validator.add_value_check('foo', int) - + def buggy_value_check(v): """I am a buggy value check.""" raise Exception('something went wrong') validator.add_value_check('bar', buggy_value_check) - + def buggy_value_predicate(v): """I am a buggy value predicate.""" raise Exception('something went wrong') validator.add_value_predicate('bar', buggy_value_predicate) - + def buggy_record_check(r): """I am a buggy record check.""" raise Exception('something went wrong') validator.add_record_check(buggy_record_check) - + def buggy_record_predicate(r): """I am a buggy record predicate.""" raise Exception('something went wrong') validator.add_record_predicate(buggy_record_predicate) - + def buggy_assert(r): """I am a buggy assert.""" raise Exception('something went wrong') validator.assert_something_buggy = buggy_assert - + def buggy_check(r): """I am a buggy check.""" raise Exception('something went wrong') validator.check_something_buggy = buggy_check - + def buggy_each(r): """I am a buggy each.""" raise Exception('something went wrong') validator.each_something_buggy = buggy_each - + def buggy_finally_assert(): """I am a buggy finally assert.""" raise Exception('something went wrong') validator.finally_assert_something_buggy = buggy_finally_assert - + def buggy_skip(record): """I am a buggy skip.""" raise Exception('something went wrong') validator.add_skip(buggy_skip) - + data = ( ('foo', 'bar'), ('ab', '56') ) - + problems = validator.validate(data, report_unexpected_exceptions=False) n = len(problems) assert n == 1, n @@ -838,12 +893,12 @@ def buggy_skip(record): problems = validator.validate(data) # by default, exceptions are reported as problems n = len(problems) assert n == 10, n - + unexpected_problems = [p for p in problems if p['code'] == UNEXPECTED_EXCEPTION] assert len(unexpected_problems) == 9 for p in unexpected_problems: e = p['exception'] - assert e.args[0] == 'something went wrong', e.args + assert e.args[0] == 'something went wrong', e.args def test_summarize(): @@ -851,21 +906,21 @@ def test_summarize(): field_names = ('foo', 'bar') validator = CSVValidator(field_names) - + def foo_gt_bar(r): - return int(r['foo']) > int(r['bar']) - validator.add_record_predicate(foo_gt_bar) - + return int(r['foo']) > int(r['bar']) + validator.add_record_predicate(foo_gt_bar) + data = ( ('foo', 'bar'), ('7', '3'), # valid - ('1', '3') # invalid + ('1', '3') # invalid ) - + problems = validator.validate(data, summarize=True) n = len(problems) assert n == 1, n - + p = problems[0] assert p['code'] == RECORD_PREDICATE_FALSE for k in ('message', 'row', 'record'): @@ -873,26 +928,26 @@ def foo_gt_bar(r): def test_limit(): - """Test the use of the limit option.""" + """Test the use of the limit option.""" field_names = ('foo', 'bar') validator = CSVValidator(field_names) - + def foo_gt_bar(r): - return int(r['foo']) > int(r['bar']) - validator.add_record_predicate(foo_gt_bar) - + return int(r['foo']) > int(r['bar']) + validator.add_record_predicate(foo_gt_bar) + data = ( ('foo', 'bar'), ('7', '3'), # valid - ('1', '3'), # invalid - ('2', '3') # invalid + ('1', '3'), # invalid + ('2', '3') # invalid ) - + problems = validator.validate(data, limit=1) n = len(problems) assert n == 1, n - + problems = validator.validate(data) n = len(problems) assert n == 2, n @@ -900,42 +955,42 @@ def foo_gt_bar(r): def test_context(): """Test passing in of context information.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) - + def foo_gt_bar(r): - return int(r['foo']) > int(r['bar']) - validator.add_record_predicate(foo_gt_bar) - + return int(r['foo']) > int(r['bar']) + validator.add_record_predicate(foo_gt_bar) + data = ( ('foo', 'bar'), ('7', '3'), # valid - ('1', '3') # invalid + ('1', '3') # invalid ) - + context = {'info': 'file X'} problems = validator.validate(data, context=context) n = len(problems) assert n == 1, n - + p = problems[0] assert p['context'] == context - + def test_write_problems(): """Test writing problems as restructured text.""" - + class MockFile(object): - + def __init__(self): self.content = '' - + def write(self, s): self.content += s - + file = MockFile() - + problems = [ { 'code': 'X1', @@ -956,7 +1011,7 @@ def write(self, s): } } ] - + expectation = """ ================= Validation Report @@ -989,20 +1044,20 @@ def write(self, s): write_problems(problems, file) assert file.content == expectation, file.content - + def test_write_problems_summarize(): """Test writing a problem summary as restructured text.""" - + class MockFile(object): - + def __init__(self): self.content = '' - + def write(self, s): self.content += s - + file = MockFile() - + problems = [ { 'code': 'X1', @@ -1032,7 +1087,7 @@ def write(self, s): } } ] - + expectation = """ ================= Validation Report @@ -1050,20 +1105,20 @@ def write(self, s): write_problems(problems, file, summarize=True) assert file.content == expectation, file.content - + def test_write_problems_with_limit(): """Test writing problems with a limit as restructured text.""" - + class MockFile(object): - + def __init__(self): self.content = '' - + def write(self, s): self.content += s - + file = MockFile() - + problems = [ { 'code': 'X1', @@ -1084,7 +1139,7 @@ def write(self, s): } } ] - + expectation = """ ================= Validation Report @@ -1108,33 +1163,33 @@ def write(self, s): """ write_problems(problems, file, limit=1) - assert file.content == expectation, file.content - - + assert file.content == expectation, file.content + + def test_skips(): """Test skip functions.""" - + field_names = ('foo', 'bar') validator = CSVValidator(field_names) - + validator.add_record_length_check() validator.add_value_check('foo', int) - + def skip_pragma(record): return record[0].startswith('##') validator.add_skip(skip_pragma) - + data = ( ('foo', 'bar'), ('1', 'X'), ('## this row', 'should be', 'skipped'), ('3', 'Y') ) - + problems = validator.validate(data) assert len(problems) == 0, problems - - + + def test_guard_conditions(): """Test some guard conditions.""" @@ -1146,5 +1201,5 @@ def test_guard_conditions(): pass # expected else: assert False, 'expected exception' - - + +