Module hxl.validation
Validation code for the Humanitarian Exchange Language (HXL) v1.0 David Megginson Started October 2014
License: Public Domain Documentation: https://github.com/HXLStandard/libhxl-python/wiki
A Schema is the top-level class for validating a HXL dataset. The easiest way to create a schema is via the hxl.schema() method. The validate() function validates a dataset. Here is a simple validation example:
def callback(e):
    print("Validation error:", e.message)
source = hxl.data(data_url)
if not hxl.schema(schema_url, callback=callback).validate(source):
    print('Validation failed')
Each schema contains one or more SchemaRule objects.
Each schema rule contains one or more objects implementing AbstractRuleTest. To add a new test, create a class extending AbstractRuleTest, override the methods you need (validate_cell() is the most common), then add code to Schema.parse() method to parse and create your new test from the HXL schema.
Validation tests go through the following workflow:
- needs_scan() to check if the test needs multiple passes
- start()
- scan_row(hxl.model.Row) for each row in the dataset (only if needs_scan() returned True)
- scan_cell(value, hxl.model.Row, hxl.model.Column) for each non-empty cell in each row (only if needs_scan() returned True)
- end_scan() (only if needs_scan() returned True)
- validate_dataset(hxl.model.Dataset)
- validate_row(hxl.model.Row) for each row (row-level validations only)
- validate_cell(value, hxl.model.Row, hxl.model.Column) for each cell in each row
- end()
Any failure means that the entire test, rule, and schema fail validation, though error reporting will continue to the end.
Expand source code
"""Validation code for the Humanitarian Exchange Language (HXL) v1.0
David Megginson
Started October 2014
License: Public Domain
Documentation: https://github.com/HXLStandard/libhxl-python/wiki
A Schema is the top-level class for validating a HXL dataset. The
easiest way to create a schema is via the hxl.schema() method. The
validate() function validates a dataset. Here is a simple validation
example:
    def callback(e):
        print("Validation error:", e.message)
    source = hxl.data(data_url)
    if not hxl.schema(schema_url, callback=callback).validate(source):
        print('Validation failed')
Each schema contains one or more SchemaRule objects.
Each schema rule contains one or more objects implementing
AbstractRuleTest. To add a new test, create a class extending
AbstractRuleTest, override the methods you need (validate_cell() is
the most common), then add code to Schema.parse() method to parse and
create your new test from the HXL schema.
Validation tests go through the following workflow:
- needs_scan() to check if the test needs multiple passes
- start()
- scan_row(hxl.model.Row) for each row in the dataset (only if needs_scan() returned True)
- scan_cell(value, hxl.model.Row, hxl.model.Column) for each non-empty cell in each row (only if needs_scan() returned True)
- end_scan() (only if needs_scan() returned True)
- validate_dataset(hxl.model.Dataset)
- validate_row(hxl.model.Row) for each row (row-level validations only)
- validate_cell(value, hxl.model.Row, hxl.model.Column) for each cell in each row
- end()
Any failure means that the entire test, rule, and schema fail
validation, though error reporting will continue to the end.
"""
import hxl
import base64, datetime, hashlib, logging, math, os, re, urllib
logger = logging.getLogger(__name__)
class HXLValidationException(hxl.HXLException):
    """Data structure to hold a HXL validation error.
    Normally, this exception isn't thrown, but is passed as a
    parameter to callbacks via Schema, SchemaRule, and classes
    extending AbstractRuleTest.
    """
    SCOPES = ('dataset', 'column', 'row', 'cell',)
    """Allowed values for the error scope"""
    def __init__(self, message, rule=None, value=None, row=None, column=None, suggested_value=None, scope='cell', is_external=False):
        """Construct a new validation error report.
        @param message: the text message for the error
        @param rule: the rule associated with the error (it may have a more-general descriptive message)
        @param value: the value that triggered the error, if available
        @param row: the hxl.model.Row object associated with the error, if any
        @param column: the hxl.model.Column object associated with the error, if any
        @param suggested_value: the suggested replacement value, if known
        @param scope: the error scope (dataset, column, row, or cell)
        @param is_external: if True, the error is external to the data itself
        """
        super().__init__(message)
        
        self.rule = rule
        self.value = value
        self.row = row
        self.column = column
        self.suggested_value = suggested_value
        self.is_external = is_external
        scope = hxl.datatypes.normalise_string(scope)
        if scope in HXLValidationException.SCOPES:
            self.scope = scope
        else:
            raise hxl.HXLException("Unrecognised validation-error scope: {}".format(scope))
    def __str__(self):
        """Get a string rendition of this error."""
        s = '<HXLValidationException '
        if self.message:
            s += self.message + ' '
        if self.rule:
            if self.rule.tag_pattern:
                if self.value:
                    s += '{}={} '.format(str(self.rule.tag_pattern), str(self.value))
                else:
                    s += '{} '.format(str(self.rule.tag_pattern))
            if self.message:
                s += '- {}'.format(self.message)
        return s
#
# Individual tests within a Schema Rule
#
class AbstractRuleTest(object):
    """Base class for a single test inside a validation rule.
    Workflow (triggered by SchemaRule):
    - needs_scan()
    - start()
    - scan_row() for each row (only if needs_scan() returned True)
    - scan_cell() for each non-empty matching cell (only if needs_scan() returned True)
    - end_scan() (only if needs_scan() returned True)
    - validate_dataset()
    - validate_row() for each row
    - validate_cell() for each matching non-empty cell in each row
    - end()
    """
    def __init__(self, callback=None):
        """Set up a schema test.
        @param callback: a callback function to receive error reports
        """
        self.callback = callback
    def needs_scan(self):
        """Report whether this test requires a cached dataset.
        A cached dataset is one that can be processed more than
        once. It requires more memory and processing time, so return
        True only if absolutely necessary.
        If this method returns True, then the validation engine will
        call scan_row() for each row in the dataset, scan_cell() for
        each non-empty matching cell, and end_scan() before invoking
        any of the validate_* methods.
        @returns: True if the test requires a cached dataset.
        """
        return False
    def start(self):
        """Setup code to run before validating each dataset.
        This code should not report errors, since it hasn't seen the data yet.
        """
        return True
    def end(self):
        """Code to run after validating each dataset.
        Will report errors via the test's callback, if available.
        @returns: True if there are no new validation errors
        """
        return True
    def scan_row(self, row, indices=None, tag_pattern=None):
        """Scan a row of the dataset to collect information.
        Will be called for each row in the dataset, but only if
        needs_scan() returns True.
        This method does not report any errors; it simply collects
        information for the validate_*() methods to use later.
        @param row: the L{hxl.model.Row} object to pre-scan.
        @param indices: optional pre-compiled indices for the rule to
        look at (in lieu of tag_pattern)
        @param tag_pattern: optional tag pattern for the rule to use.
        """
        return
    def scan_cell(self, value, row, column):
        """Pre-scan a single cell to collect information.
        Will be called for each maching non-empty cell in each row,
        but only if needs_scan() returns True.
        This method does not report any errors; it simply collects
        information for the validate_*() methods to use later.
        @param value: the non-empty value to validate
        @param row: a hxl.model.Row object for location
        @param column: a hxl.model.Column object for location
        """
        return
    def end_scan(self):
        """Clean-up calculations after scanning and before validation.
        Will be called only if needs_scan() returned True
        Does not report any errors
        """
        return
    def validate_dataset(self, dataset, indices=None, tag_pattern=None):
        """Apply test at the dataset level
        Called before validate_row() or validate_value()
        Will report errors via the test's callback, if available.
        @param dataset: a hxl.model.Dataset object to validate
        @param indices: optional pre-compiled indices for columns matching tag_pattern
        @returns: True if there are no new validation errors.
        """
        return True
    def validate_row(self, row, indices=None, tag_pattern=None):
        """Apply test at the row level
        Called for each row before validate_cell() calls.
        Will report errors via the test's callback, if available.
        @param row: a hxl.model.Row object to validate
        @param indices: optional pre-compiled indices for columns matching tag_pattern
        @returns: True if there are no new validation errors.
        """
        return True
    def validate_cell(self, value, row, column):
        """Apply test at the cell level
        Called for each matching non-empty value.
        Will also report errors via the test's callback, if available.
        @param value: the non-empty value to validate
        @param row: a hxl.model.Row object for location
        @param column: a hxl.model.Column object for location
        @returns: True if there are no new validation errors.
        """
        return True
    def get_indices(self, indices, tag_pattern, columns):
        """Get a set of column indices by hook or by crook, based on what we have"""
        if indices is not None:
            return indices
        elif tag_pattern is not None:
            tag_pattern = hxl.model.TagPattern.parse(tag_pattern)
            return hxl.model.get_column_indices([tag_pattern], columns)
        else:
            raise hxl.HXLException("Internal error: rule test requires a tag pattern or a list of indices")
    def report_error(self, message, row=None, column=None, value=None, suggested_value=None, scope='cell'):
        """Report an error from this test, if there is a callback function available."""
        if self.callback:
            self.callback(HXLValidationException(
                message,
                value=value,
                row=row,
                column=column,
                suggested_value=suggested_value,
                scope=scope,
                is_external=False
            ))
        return False # for convenience
class RequiredTest(AbstractRuleTest):
    """Test min/max occurrence
    HXL schema: #valid_required
    If the columns don't exist at all, report only a single error.
    Otherwise, report an error for each row where the test fails.
    """
    def __init__(self, min_occurs=None, max_occurs=None):
        """Constructor
        @param min: minimum occurrence required (or None)
        @param max: maximum occurrence allowed (or None)
        """
        super().__init__()
        self.min_occurs = min_occurs
        self.max_occurs = max_occurs
        self.test_rows = True
    def start(self):
        self.test_rows = True
    def validate_dataset(self, dataset, indices=None, tag_pattern=None):
        """Verify that we have enough matching columns to satisfy the test"""
        status = True
        indices = self.get_indices(indices, tag_pattern, dataset.columns)
        if self.min_occurs is not None and len(indices) < self.min_occurs:
            self.test_rows = False # no point testing individual rows
            status = self.report_error(
                "Expected at least {} matching column(s)".format(self.min_occurs),
                scope='dataset'
            )
        return status
    def validate_row(self, row, indices=None, tag_pattern=None):
        """Check the number of non-empty occurrences in a row."""
        if not self.test_rows: # skip if there aren't enough columns
            return
        status = True
        indices = self.get_indices(indices, tag_pattern, row.columns)
        non_empty_count = 0
        first_empty_column = None
        last_nonempty_column = None
        # iterate through all values in matching columns
        for i in indices:
            if i >= len(row.values) or hxl.datatypes.is_empty(row.values[i]):
                if first_empty_column is None:
                    first_empty_column = row.columns[i]
            else:
                non_empty_count += 1
                last_nonempty_column = row.columns[i]
        if self.min_occurs is not None and non_empty_count < self.min_occurs:
            status = self.report_error(
                "Expected at least {} matching non-empty value(s)".format(self.min_occurs),
                row=row,
                column=first_empty_column,
                scope='row'
            )
        if self.max_occurs is not None and non_empty_count > self.max_occurs:
            status = self.report_error(
                "Expected at most {} matching non-empty value(s)".format(self.max_occurs),
                row=row,
                column=last_nonempty_column,
                scope='row'
            )
        return status
    
class DatatypeTest(AbstractRuleTest):
    """Test for a specified datatype
    HXL schema: #valid_datatype-consistent
    See also ConsistentDatatypeTest, which infers the most-common datatype.
    """
    # allowed datatypes
    DATATYPES = ('text', 'number', 'url', 'email', 'phone', 'date',)
    def __init__(self, datatype):
        """Constructor
        @param datatype: a string specifying the datatype (e.g. "number")
        """
        super().__init__()
        # make sure we recognise the datatype
        datatype = hxl.datatypes.normalise_string(datatype)
        if datatype in DatatypeTest.DATATYPES:
            self.datatype = datatype
        else:
            raise hxl.HXLException("Unsupported datatype: {}".format(datatype))
    def validate_cell(self, value, row, column):
        """Validate datatypes on the individual cell level"""
        status = True
        def report(message):
            return self.report_error(
                message,
                value=value,
                row=row,
                column=column
            )
        
        if self.datatype == 'number':
            if not hxl.datatypes.is_number(value):
                status = report("Expected a number")
        elif self.datatype == 'url':
            pieces = urllib.parse.urlparse(value)
            if not (pieces.scheme and pieces.netloc):
                status = report("Expected a URL")
        elif self.datatype == 'email':
            if not re.match(r'^[^@]+@[^@]+$', value):
                status = report("Expected an email address")
        elif self.datatype == 'phone':
            if not re.match(r'^\+?[0-9xX()\s-]{5,}$', value):
                status= report("Expected a phone number")
        elif self.datatype == 'date':
            if not hxl.datatypes.is_date(value):
                status = report("Expected a date")
        return status
class RangeTest(AbstractRuleTest):
    """Test for a range of numbers or strings
    HXL schema: #valid_value+min and #valid_value+max
    This class will try to determine whether to use date sorting, numeric sorting, or lexical sorting
    to determine what is within a range. Ranges are always case-insensitive
    """
    def __init__(self, min_value=None, max_value=None):
        """Constructor
        @param min_value: the minimum allowed value, or None for no minimum
        @param max_value: the maximum allowed value, or None for no maximum
        """
        super().__init__()
        # normalise strings
        if min_value is not None:
            self.min_value = hxl.datatypes.normalise_string(min_value)
        else:
            self.min_value = None
        if max_value is not None:
            self.max_value = hxl.datatypes.normalise_string(max_value)
        else:
            self.max_value = None
        # precompile numbers and dates for efficiency
        try:
            self.min_num = hxl.datatypes.normalise_number(min_value)
        except ValueError:
            self.min_num = None
        try:
            self.max_num = hxl.datatypes.normalise_number(max_value)
        except ValueError:
            self.max_num = None
        try:
            self.min_date = hxl.datatypes.normalise_date(min_value)
        except ValueError:
            self.min_date = None
        try:
            self.max_date = hxl.datatypes.normalise_date(max_value)
        except ValueError:
            self.max_date = None
    def validate_cell(self, value, row, column):
        """Test that a value is >= min_value and/or <= max_value
        Includes special handling for numbers and dates.
        """
        def report(message):
            return self.report_error(
                message,
                value=value,
                row=row,
                column=column
            )
        # try as a date
        if column.tag == '#date' and (self.min_date is not None or self.max_date is not None):
            try:
                date_value = hxl.datatypes.normalise_date(value)
                if self.min_date is not None and date_value < self.min_date:
                    return report('Date must not be before {}'.format(self.min_date))
                elif self.max_date is not None and date_value > self.max_date:
                    return report('Date must not be after {}'.format(self.max_date))
                else:
                    return True
            except ValueError:
                pass # OK
        # try as a number
        if self.min_num is not None or self.max_num is not None:
            try:
                num_value = hxl.datatypes.normalise_number(value)
                if self.min_num is not None and num_value < self.min_num:
                    return report('Value must not be less than {}'.format(self.min_num))
                elif self.max_num is not None and num_value > self.max_num:
                    return report('Value must not be more than {}'.format(self.max_num))
                else:
                    return True
            except ValueError:
                pass # OK
        # try as a case-/whitespace-normalised string
        norm_value = hxl.datatypes.normalise_string(value)
        if self.min_value is not None and norm_value < self.min_value:
            return report('Value must not come before {}'.format(self.min_value))
        elif self.max_value is not None and norm_value > self.max_value:
            return report('Value must not come after {}'.format(self.max_value))
        else:
            return True
class WhitespaceTest(AbstractRuleTest):
    """Test for irregular whitespace in a cell 
    HXL schema: #valid_value+whitespace
    Irregular whitespace is any leading or trailing space, 
    or anything but a single space character inside a string
    """
    PATTERN = r'^(\s+.*|.*(\s\s|[\t\r\n]).*|.*\s+)$'
    """Regular expression to detect irregular whitespace"""
    def validate_cell(self, value, row, column):
        """Is there irregular whitespace?"""
        if hxl.datatypes.is_string(value):
            if re.match(WhitespaceTest.PATTERN, value):
                return self.report_error(
                    'Found extra whitespace',
                    value=value,
                    row=row,
                    column=column,
                    suggested_value=hxl.datatypes.normalise_space(value)
                )
        return True
        
class RegexTest(AbstractRuleTest):
    """Test that non-empty values match a regular expression 
    HXL schema: #valid_value+regex
    The regex is unanchored, so use '^' or '$' to anchor if needed
    Whitespace is not normalised before matching
    """
    def __init__(self, regex, case_sensitive=True):
        """Constructor
        @param regex: the regular expression to test against
        @param case_sensitive: if True (default), matches are case-sensitive
        """
        super().__init__()
        self.regex_text = str(regex)
        if case_sensitive:
            self.regex = re.compile(regex)
        else:
            self.regex = re.compile(regex, flags=re.IGNORECASE)
    def validate_cell(self, value, row, column):
        """Match value (including whitespace) against the regular expression"""
        if self.regex.search(value):
            return True
        else:
            return self.report_error(
                'Should match regular expression /{}/'.format(self.regex_text),
                value=value,
                row=row,
                column=column
            )
class UniqueValueTest(AbstractRuleTest):
    """Test that individual values are unique
    HXL schema: #valid_unique-key
    Every value in any column matching tag_pattern must be unique.
    Normalises case and whitespace before testing, so "Aaa" and "  aaa" 
    would count as duplicates.
    """
    def start(self):
        self.values_seen = set() # create the empty value set
    def end(self):
        self.values_seen = None # free some memory
        return True
    def validate_cell(self, value, row, column):
        """Report an error if we see the same (normalised) value more than once"""
        norm_value = hxl.datatypes.normalise_string(value)
        if norm_value in self.values_seen:
            return self.report_error(
                "Duplicate value",
                value=value,
                row=row,
                column=column
            )
        else:
            self.values_seen.add(norm_value)
            return True
class UniqueRowTest(AbstractRuleTest):
    """Test for duplicate rows, optionally using a list of tag patterns as a key
    HXL schema: #valid_unique+key
    If there are no tag patterns provided, uses the entire row to make the key
    Note that the target tag pattern (#valid_tag) is irrelevant for this test.
    """
    def __init__(self, tag_patterns=None):
        """Constructor
        If no tag patterns are supplied, test the whole row.
        @param tag_patterns: list of tag patterns to test
        """
        super().__init__()
        if tag_patterns is not None:
            self.tag_patterns = hxl.model.TagPattern.parse_list(tag_patterns)
        else:
            self.tag_patterns = None
    def start(self):
        self.keys_seen = set() # create the empty key set
    def end(self):
        self.keys_seen = None # free some memory
        return True
    def validate_row(self, row, indices=[], tag_pattern=None):
        key = row.key(self.tag_patterns)
        if key in self.keys_seen:
            return self.report_error(
                'Duplicate row according to key values {}'.format(str(key)),
                row=row,
                scope='row'
            )
        else:
            self.keys_seen.add(key)
            return True
class EnumerationTest(AbstractRuleTest):
    """Test against a list of enumerated values 
    HXL schema: #valid_value+list #valid_value+url #valid_value+case
    This test can also hold an extra error to report if there was a problem
    e.g. reading the values externally.
    """
    def __init__(self, allowed_values, case_sensitive=False):
        """Constructor
        @param allowed_values: sequence of allowed values
        @param case_sensitive: if True, make comparisons case-sensitive
        """
        super().__init__()
        self.case_sensitive = case_sensitive
        self.setup_tables(allowed_values)
    def setup_tables(self, allowed_values):
        self.suggested_value_cache = dict()
        self.cooked_value_set = set()
        if self.case_sensitive:
            for raw_value in allowed_values:
                raw_value = hxl.datatypes.normalise_space(raw_value)
                self.cooked_value_set.add(raw_value)
        else:
            self.raw_value_map = dict()
            for raw_value in allowed_values:
                raw_value = hxl.datatypes.normalise_space(raw_value)
                cooked_value = hxl.datatypes.normalise_string(raw_value)
                self.cooked_value_set.add(cooked_value)
                self.raw_value_map[cooked_value] = raw_value
    def validate_cell(self, value, row, column):
        if self.case_sensitive:
            cooked_value = hxl.datatypes.normalise_space(value)
        else:
            cooked_value = hxl.datatypes.normalise_string(value)
        if cooked_value in self.cooked_value_set:
            return True
        else:
            suggested_value = self.get_suggested_value(cooked_value)
            return self.report_error(
                "Value not allowed",
                value=value,
                row=row,
                column=column,
                suggested_value=suggested_value
            )
    def get_suggested_value(self, value):
        # try the cache first
        suggested_value = self.suggested_value_cache.get(value, False) # False to allow for None values
        if suggested_value is False:
            suggested_value = find_closest_match(value, self.cooked_value_set)
            if suggested_value is not None and not self.case_sensitive:
                # we need the original character case if case-insensitive
                suggested_value = self.raw_value_map[suggested_value]
            self.suggested_value_cache[value] = suggested_value
        return suggested_value
    
class CorrelationTest(AbstractRuleTest):
    """Test for correlations with other values
    #valid_correlation
    Supply a list of tag patterns, and report any outliers that don't
    correlate with those columns.
    TODO: this might be more efficient with pre-scanning
    """
    def __init__(self, tag_patterns):
        """Constructor
        @param tag_patterns: a list of tag patterns for the correlation
        """
        super().__init__()
        self.tag_patterns = hxl.model.TagPattern.parse_list(tag_patterns)
    def start(self):
        """Build an empty correlation map"""
        self.correlation_map = dict()
        self.correlation_indices = None
    def end(self):
        """All the error reporting happens here.
        We don't know the most-common correlations for each key until the end.
        Use the most-common value for each correlation key as the suggested
        value for the others.
        """
        status = True
        for tagspec, correlations in self.correlation_map.items():
            for key, value_maps in correlations.items():
                if len(value_maps) > 1:
                    status = False
                    value_maps = sorted(
                        value_maps.items(),
                        key=lambda e: len(e[1]),
                        reverse=True
                    )
                    suggested_value = value_maps[0][0]
                    for value_map in value_maps[1:]:
                        value = value_map[0]
                        for location in value_map[1]:
                            self.report_error(
                                "Unexpected value",
                                value=value,
                                row=location[0],
                                column=location[1],
                                suggested_value=suggested_value
                            )
        self.correlation_map = None
        return status
    def validate_row(self, row, indices=None, tag_pattern=None):
        """Row validation always succeeds
        We use this method to capture the correlation keys,
        so that we can report on them in the end() method.
        """
        # will cache, so that we don't repeat the calculation
        indices = self.get_indices(indices, tag_pattern, row.columns)
        # calculate the correlation-column indices only once as well
        if self.correlation_indices is None:
            self.correlation_indices = hxl.model.get_column_indices(self.tag_patterns, row.columns)
        # Make the correlation key
        key = row.key(indices=self.correlation_indices)
        # Record the locations
        # key -> value -> location
        for i in indices:
            tagspec = row.columns[i].get_display_tag(sort_attributes=True)
            if i < len(row.values) and not hxl.datatypes.is_empty(row.values[i]):
                value = row.values[i]
                column = row.columns[i]
                self.correlation_map.setdefault(tagspec, {}).setdefault(key, {}).setdefault(value, []).append((row, column,))
        # always succeed
        return True
    
class ConsistentDatatypesTest(AbstractRuleTest):
    """Check for consistent datatypes in a column.
    HXL: #valid_datatype+consistent
    Will report all but the most-common datatype as errors.
    Special knowledge of the #date hashtag
    """
    def needs_scan(self):
        """We want to prescan the dataset"""
        return True
    def start(self):
        self.datatype_map = dict()
    def scan_cell(self, value, row, column):
        datatype = self.guess_type(value, column)
        tagspec = column.get_display_tag(sort_attributes=True) # FIXME
        # keep track of how often the type appeared
        if not tagspec in self.datatype_map:
            self.datatype_map[tagspec] = {}
        if not datatype in self.datatype_map[tagspec]:
            self.datatype_map[tagspec][datatype] = 0
        self.datatype_map[tagspec][datatype] += 1
    def end_scan(self):
        """Reduce the datatype_map to the most-common type for each tagspec"""
        for tagspec, datatypes in self.datatype_map.items():
            max_type = None
            max_count = None
            for datatype, count in datatypes.items():
                if max_count is None or count > max_count:
                    max_count = count
                    max_type = datatype
            self.datatype_map[tagspec] = max_type
    def validate_cell(self, value, row, column):
        """Keep track of each different datatype
        Error reporting happens in the end() method, once we know which
        type is most common.
        @returns: always True
        """
        actual_datatype = self.guess_type(value, column)
        tagspec = column.get_display_tag(sort_attributes=True) # FIXME
        expected_datatype = self.datatype_map.get(tagspec)
        if actual_datatype == expected_datatype:
            return True
        else:
            message = "Inconsistent data types: expected {} but found {}".format(expected_datatype, actual_datatype)
            return self.report_error(
                message,
                value=value,
                row=row,
                column=column
            )
    def guess_type(self, value, column):
        """Guess the type of a value"""
        if column.tag == '#date' and hxl.datatypes.is_date(value):
            return 'date'
        elif hxl.datatypes.is_number(value):
            return 'number'
        else:
            return 'text'
class SpellingTest(AbstractRuleTest):
    """Detect spelling outliers in a column
    HXL schema: #valid_value+spelling
    Will treat numbers and dates as strings, so use this only in columns where
    you expect text, and frequently-repeated values (e.g. #status, #org+name, #sector+name).
    Will skip validation if the coefficient of variation > 1.0
    Collects all of the spelling variants first, then checks the rare ones in the end() method, and
    reports any ones that have near matches among the common ones.
    """
    ERROR_CUTOFF=0.05 # 5%
    """Cutoff for reporting a possible error, as percentage of mean frequency for each spelling"""
    def __init__(self, case_sensitive=False):
        """Constructor
        @param case_sensitive: if True, differences in case are considered errors (default False)
        """
        super().__init__()
        self.case_sensitive = case_sensitive
    def start(self):
        """Set up for a validation run"""
        # spelling -> locations
        self.spelling_map = dict()
        """Store spellings and locations by tagspec"""
        
        self.total_occurrences = dict()
        """Count the total spelling occurrences, for mean and standard deviation, by tagspec"""
    def end(self):
        """Report possible spelling errors.
        Collect all spellings that appear less than 1/3 of the mean frequency.
        For each of these, check whether there's a close match among
        the more-common spellings, and if so, then report (once for
        each location) with the suggested correction.
        """
        # start by assuming all is well
        status = True
        for tagspec, spellings in self.spelling_map.items():
            # cache corrections so that we don't keep looking up the same ones
            correction_cache = {}
            # if there aren't any spellings, then we're done
            if len(spellings) == 0:
                break
            # get the average (mean) occurrences for each spelling
            mean_frequency = self.total_occurrences[tagspec] / len(spellings)
            # calculate the coefficiant of variance (dimensionless)
            standard_deviation = math.sqrt(
                sum(map(lambda n: (len(n)-mean_frequency)**2, spellings.values())) / len(spellings)
            )
            variance_coefficient = standard_deviation / mean_frequency
            # there's no point spelling checking unless the variance coefficient is low enough to be meaningful
            if variance_coefficient > 1.0:
                break
            # first pass: collect and clear good spellings
            good_spellings = list()
            for spelling, locations in spellings.items():
                if len(locations) > mean_frequency * SpellingTest.ERROR_CUTOFF:
                    good_spellings.append(spelling)
                    spellings[spelling] = None # no potential errors
            # second pass: report any remaining dubious spellings that have close matches among good spellings
            for spelling, locations in spellings.items():
                if locations is None: # this spelling was OK
                    continue
                # is there a near match among good spellings?
                if spelling in correction_cache:
                    correction = correction_cache['spelling']
                else:
                    correction = find_closest_match(spelling, good_spellings)
                    correction_cache[spelling] = correction
                if correction is not None:
                    # if it's rare and there's a near match, report an error
                    status = False
                    for location in locations:
                        self.report_error(
                            'Possible spelling error',
                            value=location[2],
                            row=location[0],
                            column=location[1],
                            suggested_value=correction,
                            scope='cell'
                        )
                
        return status # false if we've found a possible correction
    def validate_cell(self, value, row, column):
        """Record all the spellings found, for later sorting"""
        tagspec = column.get_display_tag(sort_attributes=True) # FIXME
        if self.case_sensitive:
            cooked_value = hxl.datatypes.normalise_space(value)
        else:
            cooked_value = hxl.datatypes.normalise_string(value)
        self.total_occurrences[tagspec] = self.total_occurrences.setdefault(tagspec, 0) + 1
        self.spelling_map.setdefault(tagspec, {}).setdefault(cooked_value, []).append((row, column, value,))
        return True
    
class NumericOutlierTest(AbstractRuleTest):
    """Detect outliers among matching values
    Will skip any tagset with a coefficient of variation > 1.0
    """
    def needs_scan(self):
        return True
    def start(self):
        self.standard_deviations = dict()
        self.mean_values = dict()
        self.variation_coefficients = dict()
        self.values = dict()
    def scan_cell(self, value, row, column):
        tagspec = column.get_display_tag(sort_attributes=True) # FIXME
        try:
            num = hxl.datatypes.normalise_number(value)
            if not tagspec in self.values:
                self.values[tagspec] = list()
            self.values[tagspec].append(num)
        except:
            # not a number, so ignore
            pass
    def end_scan(self):
        for tagspec in self.values:
            values = self.values[tagspec]
            # if the list is long enough, remove the min and max values
            if len(values) >= 10: # 10 is our cutoff for removing lowest and highest values
                values = sorted(values)[1:-1]
            # now calculate the standard deviation of the remaining values
            self.mean_values[tagspec] = sum(values) / len(values)
            self.standard_deviations[tagspec] = math.sqrt(
                sum(map(lambda n: (n-self.mean_values[tagspec])*(n-self.mean_values[tagspec]), values)) / len(values)
            )
            # calculate the coefficient of variance, which we'll use as a cutoff
            if self.mean_values[tagspec]:
                self.variation_coefficients[tagspec] = self.standard_deviations[tagspec] / self.mean_values[tagspec]
            else:
                self.variation_coefficients[tagspec] = 0
        # free some memory
        del self.values
    def validate_cell(self, value, row, column):
        tagspec = column.get_display_tag(sort_attributes=True) # FIXME
        # don't bother if the data is highly variable
        if self.variation_coefficients.get(tagspec, 0.0) > 1.0:
            return True
        # try numeric validation
        try:
            num = hxl.datatypes.normalise_number(value)
            distance = abs(num - self.mean_values[tagspec])
            if distance > self.standard_deviations[tagspec] * 3:
                self.report_error(
                    "Possible numeric outlier",
                    value=value,
                    row=row,
                    column=column
                )
                return False
        except:
            # not a number, so ignore
            pass
        return True
    
#
# A single rule (containing one or more tests) within a schema
#
class SchemaRule(object):
    """A single rule within a schema.
    A rule contains one or more tests, together with some common metadata
    (a tag pattern, severity level, and description). If any test fails, then
    the whole rule fails.
    Workflow (triggered by Schema.validate:
    - needs_scan()
    - start()
    - scan_row() for each row (if needs_scan() returned True)
    - scan_cell() for each maching non-empty cell (if needs_scan() returned True)
    - end_scan() (if needs_scan() returned True)
    - validate_dataset()
    - validate_row() for each row
    - validate_cell() for each matching non-empty cell in each row
    - end()
    """
    SEVERITY = ('info', 'warning', 'error',)
    def __init__(self, tag_pattern, severity="error", description=None, callback=None):
        """Constructor
        @param tag_pattern: the tag pattern to match for the rule
        @param severity: one of 'info', 'warning', or 'error' (default)
        @param description: an optional error message to override the default from the tests
        @param callback: an optional callback function to handle error reports
        """
        self.tag_pattern = hxl.TagPattern.parse(tag_pattern)
        self.description = description
        self.callback = callback
        # make sure the severity level is valid
        severity = hxl.datatypes.normalise_string(severity)
        if severity in SchemaRule.SEVERITY:
            self.severity = severity
        else:
            raise hxl.HXLException("Unsupported rule severity level: {}".format(severity))
        # Additional internal variables
        self.tests = []
        """List of AbstractRuleTest objects to apply as part of this rule"""
        self.external_errors = []
        """Errors external to the dataset itself (e.g. missing taxonomies)"""
        self.saved_indices = None
        """List of saved column indices matching tag_pattern"""
    def needs_scan(self):
        """Check if any test in this rule requires a pre-scan of the dataset.
        @returns: True if at least one test's needs_scan() method returns True
        """
        for test in self.tests:
            if test.needs_scan():
                return True
        return False
    def start(self):
        """Initialisation method
        Call after all values have been set, but before use
        """
        self.saved_indices = None
        def test_callback(e):
            """Relay error reports from tests, with rule-level context added"""
            e.rule = self
            if self.callback:
                self.callback(e)
        # (re)initialise all the tests
        for test in self.tests:
            test.callback = test_callback # call back to here
            test.start()
    def end(self):
        """Call at end of parse to get post-parse errors"""
        status = True
        # finish all the tests
        for test in self.tests:
            if not test.end():
                status = False
        self.saved_indices = None
        return status
    def scan_row(self, row):
        """Pre-scan a row and its individual cells.
        This method does not report errors or return a status.
        Will be invoked only if needs_scan() returned True
        Calls both scan_row() and scan_cell() for each test.
        @param row the Row to scan
        """
        if self.saved_indices is None:
            self.saved_indices = hxl.model.get_column_indices([self.tag_pattern], row.columns)
        # scan each row, then each matching cell in the row
        for test in self.tests:
            if not test.needs_scan(): # don't invoke unless the test asked for it
                continue
            test.scan_row(row, self.saved_indices)
            for i in self.saved_indices: # validate individual cells
                if i < len(row.values) and not hxl.datatypes.is_empty(row.values[i]):
                    test.scan_cell(row.values[i], row, row.columns[i])
    def end_scan(self):
        """Invoke end_scan() for all tests that need it"""
        for test in self.tests:
            if test.needs_scan():
                test.end_scan()
                    
    def validate_dataset(self, dataset, indices=None, tag_pattern=None):
        """Test whether the columns are present to satisfy this rule."""
        
        status = True
        if self.saved_indices is None:
            self.saved_indices = hxl.model.get_column_indices([self.tag_pattern], dataset.columns)
        # Report any external errors
        for error in self.external_errors:
            if self.callback:
                self.callback(error)
        # run each of the tests
        for test in self.tests:
            if not test.validate_dataset(dataset, indices=self.saved_indices):
                status = False
        return status
    def validate_row(self, row):
        """
        Apply the rule to an entire Row
        @param row the Row to validate
        @return True if all matching values in the row are valid
        """
        # individual rules may change to False
        status = True
        if self.saved_indices is None:
            self.saved_indices = hxl.model.get_column_indices([self.tag_pattern], row.columns)
        # run each test on the complete row, then on individual cells
        for test in self.tests:
            if not test.validate_row(row, self.saved_indices):
                status = False
            for i in self.saved_indices: # validate individual cells
                if i < len(row.values) and not hxl.datatypes.is_empty(row.values[i]):
                    if not test.validate_cell(row.values[i], row, row.columns[i]):
                        status = False
        return status
    def __str__(self):
        """String representation of a rule (for debugging)"""
        return "<HXL schema rule: " + str(self.tag_pattern) + ">"
                
class Schema(object):
    """Schema against which to validate a HXL document.
    Consists of a sequence of L{SchemaRule} objects to apply to the data.
    The validate() method triggers the following:
    - start()
    - validate_dataset()
    - validate_row() for each row
    - validate_cell() for each matching non-empty cell in each row
    - end()
    Add new rules with
        schema.rules.append(rule)
    """
    def __init__(self, callback=None):
        """Constructor
        @param callback: a callback function to receive HXLValidationException objects as error reports
        """
        self.rules = []
        """Rules making up this schema"""
        
        self.callback = callback
        """Callback function to receive error reports"""
    def validate(self, source):
        """Execute the main validation workflow.
        @param source: the hxl.model.Dataset to validate
        """
        status = True # all is well at the beginning
        needs_scan = False # assume we don't need a pre-scan
        # do we need a cached, in-memory dataset?
        for rule in self.rules:
            if rule.needs_scan():
                needs_scan = True
                if not source.is_cached:
                    source = source.cache()
                break
        # initial setup
        self.start()
        # pre-scan if needed
        if needs_scan:
            for row in source:
                self.scan_row(row)
            self.end_scan()
        # dataset-level validations
        if not self.validate_dataset(source):
            status = False
        # row-level validations
        # (will also include cell-level validations)
        for row in source:
            if not self.validate_row(row):
                status = False
        # finalisation
        if not self.end():
            status = False
            
        return status
    def start(self):
        """Initialise the validation run"""
        def rule_callback(e):
            """Relay rule callbacks"""
            if self.callback:
                self.callback(e)
                
        for rule in self.rules:
            rule.callback = rule_callback
            rule.start()
    def end(self):
        """Terminate the validation run"""
        status = True
        for rule in self.rules:
            if not rule.end():
                status = False
        return status
    def scan_row(self, row):
        """Pre-scan a row, only for rules that require it."""
        for rule in self.rules:
            if rule.needs_scan():
                rule.scan_row(row)
    def end_scan(self):
        """End pre-scan, for rules that require it."""
        for rule in self.rules:
            if rule.needs_scan():
                rule.end_scan()
    def validate_dataset(self, dataset):
        """Validate just at the dataset level
        @param dataset: the hxl.model.Dataset object to validate
        """
        status = True
        for rule in self.rules:
            if not rule.validate_dataset(dataset):
                status = False
        return status
    def validate_row(self, row):
        """Validate at the row and cell levels.
        Each rule will handle cell-level validation on its own,
        because it knows what columns to look at.
        @param row: the row to validate
        """
        status = True
        for rule in self.rules:
            if not rule.validate_row(row):
                status = False
        return status
    def __str__(self):
        """String representation of a schema (for debugging)"""
        s = "<HXL schema\n"
        for rule in self.rules:
            s += "  " + str(rule) + "\n"
        s += ">"
        return s
    @staticmethod
    def parse(source=None, callback=None):
        """ Load a HXL schema from the provided input stream, or load default schema.
        @param source: HXL data source for the scheme (e.g. a HXLReader or filter); defaults to the built-in schema
        @param callback: a callback function for reporting errors (receives a HXLValidationException)
        """
        # Catch special cases
        if source is None:
            # Use the built-in default schema and recurse
            path = os.path.join(os.path.dirname(__file__), 'hxl-default-schema.json');
            with hxl.data(path, hxl.InputOptions(allow_local=True)) as source:
                return Schema.parse(source, callback)
        if isinstance(source, Schema):
            # Already a schema; set the callback and return it
            source.callback = callback
            return source
        if not isinstance(source, hxl.model.Dataset):
            # Not already a dataset, so wrap it and recurse
            with hxl.data(source) as source:
                return Schema.parse(source, callback)
        # Main parsing
        schema = Schema(callback=callback)
        def parse_type(type):
            if type:
                type = type.lower()
                type = re.sub(r'[^a-z_-]', '', type) # normalise
            if type in SchemaRule.DATATYPES:
                return type
            else:
                return None
        def to_int(s):
            if s:
                return int(s)
            else:
                return None
        def to_float(s):
            if s:
                return float(s)
            else:
                return None
        def to_boolean(s):
            if not s or s.lower() in ['0', 'n', 'no', 'f', 'false']:
                return False
            elif s.lower() in ['y', 'yes', 't', 'true']:
                return True
            else:
                raise hxl.HXLException('Unrecognised true/false value: {}'.format(s))
        for row in source:
            tags = row.get('#valid_tag')
            if tags:
                tag_patterns = hxl.model.TagPattern.parse_list(tags)
                for tag_pattern in tag_patterns:
                    rule = SchemaRule(tag_pattern)
                    rule.severity = row.get('#valid_severity') or 'error'
                    rule.description = row.get('#description')
                    # for later use
                    case_sensitive = to_boolean(row.get('#valid_value+case'))
                    if to_boolean(row.get('#valid_required-min-max')):
                        rule.tests.append(RequiredTest(min_occurs=1, max_occurs=None))
                    min_occurs = to_int(row.get('#valid_required+min'))
                    max_occurs = to_int(row.get('#valid_required+max'))
                    if min_occurs is not None or max_occurs is not None:
                        rule.tests.append(RequiredTest(min_occurs=min_occurs, max_occurs=max_occurs))
                    datatype = row.get('#valid_datatype-consistent')
                    if datatype is not None:
                        rule.tests.append(DatatypeTest(datatype))
                    min_value = row.get('#valid_value+min')
                    max_value = row.get('#valid_value+max')
                    if min_value is not None or max_value is not None:
                        rule.tests.append(RangeTest(min_value=min_value, max_value=max_value))
                    if to_boolean(row.get('#valid_value+whitespace')):
                        rule.tests.append(WhitespaceTest())
                    regex = row.get('#valid_value+regex')
                    if regex is not None:
                        rule.tests.append(RegexTest(regex, case_sensitive))
                    if to_boolean(row.get('#valid_value+spelling')):
                        rule.tests.append(SpellingTest(case_sensitive=case_sensitive))
                    if to_boolean(row.get('#valid_unique-key')):
                        rule.tests.append(UniqueValueTest())
                    key = row.get('#valid_unique+key')
                    if hxl.datatypes.is_truthy(key):
                        # could be problematic if there's even a hashtag like #true or #yes
                        rule.tests.append(UniqueRowTest())
                    elif not hxl.datatypes.is_empty(key):
                        rule.tests.append(UniqueRowTest(key))
                    correlations = row.get('#valid_correlation')
                    if not hxl.datatypes.is_empty(correlations):
                        rule.tests.append(CorrelationTest(correlations))
                    if to_boolean(row.get('#valid_datatype+consistent')):
                        rule.tests.append(ConsistentDatatypesTest())
                    if to_boolean(row.get('#valid_value+outliers')):
                        rule.tests.append(NumericOutlierTest())
                    l = row.get('#valid_value+list')
                    if not hxl.datatypes.is_empty(l):
                        allowed_values = re.split(r'\s*\|\s*', l)
                        if len(allowed_values) > 0:
                            rule.tests.append(EnumerationTest(allowed_values, case_sensitive))
                    url = row.get('#valid_value+url')
                    if not hxl.datatypes.is_empty(url):
                        # default the target tag to the #valid_tag
                        target_tag = row.get('#valid_value+target_tag', default=tag_pattern)
                        try:
                            # read the values from an external dataset
                            source = hxl.data(url)
                            allowed_values = source.get_value_set(row.get('#valid_value+target_tag'))
                            if len(allowed_values) > 0:
                                rule.tests.append(EnumerationTest(allowed_values, case_sensitive))
                        except BaseException as error:
                            # don't add the test to the rule
                            # do add an error about loading the values
                            rule.external_errors.append(HXLValidationException(
                                'Error loading allowed values from {}: {}'.format(url, error.args[0]),
                                scope='dataset',
                                rule=rule,
                                is_external=True
                            ))
                    schema.rules.append(rule)
        return schema
#
# Internal helper functions
#
def find_closest_match(s, allowed_values):
    """Find the closest match for a value from a list.
    This is not super efficient right now; look at
    https://en.wikipedia.org/wiki/Edit_distance for better algorithms.
    Uses a cutoff of len(s)/3 for candidate matches, and if two matches have the same 
    edit distance, prefers the one with the longer common prefix.
    @param s: the misspelled string to check
    @param allowed_values: a list of allowed values
    @return: the best match, or None if there was no candidate
    """
    best_match = None
    max_distance = len(s) / 3
    for value in allowed_values:
        distance = get_edit_distance(s, value)
        if (best_match is not None and distance > best_match[1]) or (distance > max_distance):
            continue
        prefix_len = get_common_prefix_len(s, value)
        if (best_match is None) or (distance < best_match[1]) or (distance == best_match[1] and prefix_len > best_match[2]):
            best_match = (value, distance, prefix_len,)
    if best_match is None:
        return None
    else:
        return best_match[0]
def get_common_prefix_len(s1, s2):
    """Return the longest common prefix of two strings
    Adopted from example in https://stackoverflow.com/questions/9114402/regexp-finding-longest-common-prefix-of-two-strings
    @param s1: the first string to compare
    @param s2: the second string to compare
    @returns: the length of the longest common prefix of the two strings
    """
    i = 0
    for i, (x, y) in enumerate(zip(s1, s2)):
        if x != y:
            break
    return i
def get_edit_distance(s1, s2):
    """Calculate the Levenshtein distance between two normalised strings
    Adopted from example in https://stackoverflow.com/questions/2460177/edit-distance-in-python
    See https://en.wikipedia.org/wiki/Edit_distance
    @param s1: the first string to compare
    @param s2: the second string to compare
    @returns: an integer giving the edit distance
    """
    if len(s1) > len(s2):
        s1, s2 = s2, s1
    distances = range(len(s1) + 1)
    for i2, c2 in enumerate(s2):
        distances_ = [i2+1]
        for i1, c1 in enumerate(s1):
            if c1 == c2:
                distances_.append(distances[i1])
            else:
                distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1])))
        distances = distances_
    return distances[-1]
#
# Exported functions
#
def schema(source=None, callback=None):
    """Convenience function for making a schema
    Imported into __init__, and usually called as hxl.schema(source, callback).
    The callback, if provided, will receive a HXLValidationException object for each error
    @param source: something that can be used as a HXL data source
    @param callback: the validation callback function to use
    """
    return Schema.parse(source, callback)
def validate(data, schema=None):
    """Convenience function for validating HXL data.
    The is_valid parameter in the report will be a True/False value for the result.
    If you want to do anything tricky, you can pass pre-cooked hxl.data() parameters in, e.g.
    result = hxl.validate(hxl.data('foo.csv', allow_local=True), hxl.data('schema.csv', allow_local=True))
    @param data: the data to validate (a URL or anything else accepted by hxl.data)
    @param schema: the schema to validate against (anything accepted by L{hxl.data}), or None (default) to use the built-in schema.
    @returns: a JSON validation report as documented at https://github.com/HXLStandard/hxl-proxy/wiki/Validation-reports
    """
    issue_map = dict()
    external_issue_map = dict()
    def add_issue(issue):
        hash = make_rule_hash(issue.rule)
        if issue.is_external:
            external_issue_map.setdefault(hash, []).append(issue)
        else:
            issue_map.setdefault(hash, []).append(issue)
    status = hxl.schema(schema, callback=add_issue).validate(hxl.data(data))
    schema_url = None
    data_url = None
    if hxl.datatypes.is_string(schema):
        schema_url = schema
    if hxl.datatypes.is_string(data):
        data_url = data
    return make_json_report(status, issue_map, external_issue_map, schema_url=schema_url, data_url=data_url)
#
# Local functions
#
def make_json_report(status, issue_map, external_issue_map, schema_url=None, data_url=None):
    """Generate a JSON error report from a dict of errors
    @param status: the validation status (boolean)
    @param issue_map: a dict of lists of HXLValidationException objects grouped by rule hash
    @param external_issue_map: a dict of lists of HXLValidationException objects grouped by rule hash
    @param data_url: the original URL of the data, if available
    @param schema_url: the original URL of the schema, if available
    """
    json_report = {
        "validator": "libhxl-python",
        "timestamp": datetime.datetime.now().isoformat(),
        "is_valid": status,
        "stats": {
            "info": 0,
            "warning": 0,
            "error": 0,
            "external": 0,
            "total": 0,
        },
        "issues": [],
        "external_issues": [],
    }
    if schema_url is not None:
        json_report['schema_url'] = schema_url
        
    if data_url is not None:
        json_report['data_url'] = data_url
    # add the issue objects
    for rule_id, locations in issue_map.items():
        json_issue = make_json_issue(rule_id, locations)
        json_report['stats']['total'] += len(json_issue['locations'])
        json_report['stats'][locations[0].rule.severity] += len(json_issue['locations'])
        json_report['issues'].append(json_issue)
    # add the external issue objects
    for rule_id, locations in external_issue_map.items():
        json_issue = make_json_issue(rule_id, locations, is_external=True)
        json_report['stats']['total'] += 1
        json_report['stats']['external'] += 1
        json_report['external_issues'].append(json_issue)
    return json_report
def make_json_issue(rule_id, locations, is_external=False):
    """Create an issue (with list of locations) for a JSON validation report
    @param rule_id: the hash for the rule (used to group locations)
    @param locations: a list of HXLValidation
    """
    # grab first location as a model
    model = locations[0]
    # get a custom description first, then the generic message as a fallback
    description = model.rule.description
    if not description:
        description = model.message
    # make the issue
    json_issue = {
        "rule_id": rule_id,
        "tag_pattern": str(model.rule.tag_pattern),
        "description": description,
        "severity": model.rule.severity,
        "scope": model.scope,
    }
    # get all unique locations
    if is_external:
        json_issue["description"] = model.message
    else:
        location_keys = set()
        json_locations = []
        for location in locations:
            row_number = location.row.row_number if location.row else None
            column_number = location.column.column_number if location.column else None
            location_key = (row_number, column_number, location.value, location.suggested_value,)
            if not location_key in location_keys:
                json_locations.append(make_json_location(location))
                location_keys.add(location_key)
        json_issue["location_count"] = len(locations)
        json_issue["locations"] = json_locations
    return json_issue
def make_json_location(location):
    """Create a single location for a JSON validation report"""
    json_location = {}
    # is there a row object?
    if location.row is not None:
        if location.row.row_number is not None:
            json_location['row'] = location.row.row_number
        if location.row.source_row_number is not None:
            json_location['source_row'] = location.row.source_row_number
    # is there a column object?
    if location.column is not None:
        if location.column.column_number is not None:
            json_location['col'] = location.column.column_number
        if location.column.display_tag is not None:
            json_location['hashtag'] = location.column.display_tag
    # is there an error value?
    if location.value is not None:
        json_location['location_value'] = location.value
    # is there a suggested value?
    if location.suggested_value is not None:
        json_location['suggested_value'] = location.suggested_value
    return json_location
def make_rule_hash(rule):
    """Make a good-enough hash for a rule."""
    s = "\r".join([str(rule.severity), str(rule.description), str(rule.tag_pattern)])
    return base64.urlsafe_b64encode(hashlib.md5(s.encode('utf-8')).digest())[:8].decode('ascii')
# endFunctions
- def find_closest_match(s, allowed_values)
- 
Find the closest match for a value from a list. This is not super efficient right now; look at https://en.wikipedia.org/wiki/Edit_distance for better algorithms. Uses a cutoff of len(s)/3 for candidate matches, and if two matches have the same edit distance, prefers the one with the longer common prefix. @param s: the misspelled string to check @param allowed_values: a list of allowed values @return: the best match, or None if there was no candidate Expand source codedef find_closest_match(s, allowed_values): """Find the closest match for a value from a list. This is not super efficient right now; look at https://en.wikipedia.org/wiki/Edit_distance for better algorithms. Uses a cutoff of len(s)/3 for candidate matches, and if two matches have the same edit distance, prefers the one with the longer common prefix. @param s: the misspelled string to check @param allowed_values: a list of allowed values @return: the best match, or None if there was no candidate """ best_match = None max_distance = len(s) / 3 for value in allowed_values: distance = get_edit_distance(s, value) if (best_match is not None and distance > best_match[1]) or (distance > max_distance): continue prefix_len = get_common_prefix_len(s, value) if (best_match is None) or (distance < best_match[1]) or (distance == best_match[1] and prefix_len > best_match[2]): best_match = (value, distance, prefix_len,) if best_match is None: return None else: return best_match[0]
- def get_common_prefix_len(s1, s2)
- 
Return the longest common prefix of two strings Adopted from example in https://stackoverflow.com/questions/9114402/regexp-finding-longest-common-prefix-of-two-strings @param s1: the first string to compare @param s2: the second string to compare @returns: the length of the longest common prefix of the two strings Expand source codedef get_common_prefix_len(s1, s2): """Return the longest common prefix of two strings Adopted from example in https://stackoverflow.com/questions/9114402/regexp-finding-longest-common-prefix-of-two-strings @param s1: the first string to compare @param s2: the second string to compare @returns: the length of the longest common prefix of the two strings """ i = 0 for i, (x, y) in enumerate(zip(s1, s2)): if x != y: break return i
- def get_edit_distance(s1, s2)
- 
Calculate the Levenshtein distance between two normalised strings Adopted from example in https://stackoverflow.com/questions/2460177/edit-distance-in-python See https://en.wikipedia.org/wiki/Edit_distance @param s1: the first string to compare @param s2: the second string to compare @returns: an integer giving the edit distance Expand source codedef get_edit_distance(s1, s2): """Calculate the Levenshtein distance between two normalised strings Adopted from example in https://stackoverflow.com/questions/2460177/edit-distance-in-python See https://en.wikipedia.org/wiki/Edit_distance @param s1: the first string to compare @param s2: the second string to compare @returns: an integer giving the edit distance """ if len(s1) > len(s2): s1, s2 = s2, s1 distances = range(len(s1) + 1) for i2, c2 in enumerate(s2): distances_ = [i2+1] for i1, c1 in enumerate(s1): if c1 == c2: distances_.append(distances[i1]) else: distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1]))) distances = distances_ return distances[-1]
- def make_json_issue(rule_id, locations, is_external=False)
- 
Create an issue (with list of locations) for a JSON validation report @param rule_id: the hash for the rule (used to group locations) @param locations: a list of HXLValidation Expand source codedef make_json_issue(rule_id, locations, is_external=False): """Create an issue (with list of locations) for a JSON validation report @param rule_id: the hash for the rule (used to group locations) @param locations: a list of HXLValidation """ # grab first location as a model model = locations[0] # get a custom description first, then the generic message as a fallback description = model.rule.description if not description: description = model.message # make the issue json_issue = { "rule_id": rule_id, "tag_pattern": str(model.rule.tag_pattern), "description": description, "severity": model.rule.severity, "scope": model.scope, } # get all unique locations if is_external: json_issue["description"] = model.message else: location_keys = set() json_locations = [] for location in locations: row_number = location.row.row_number if location.row else None column_number = location.column.column_number if location.column else None location_key = (row_number, column_number, location.value, location.suggested_value,) if not location_key in location_keys: json_locations.append(make_json_location(location)) location_keys.add(location_key) json_issue["location_count"] = len(locations) json_issue["locations"] = json_locations return json_issue
- def make_json_location(location)
- 
Create a single location for a JSON validation report Expand source codedef make_json_location(location): """Create a single location for a JSON validation report""" json_location = {} # is there a row object? if location.row is not None: if location.row.row_number is not None: json_location['row'] = location.row.row_number if location.row.source_row_number is not None: json_location['source_row'] = location.row.source_row_number # is there a column object? if location.column is not None: if location.column.column_number is not None: json_location['col'] = location.column.column_number if location.column.display_tag is not None: json_location['hashtag'] = location.column.display_tag # is there an error value? if location.value is not None: json_location['location_value'] = location.value # is there a suggested value? if location.suggested_value is not None: json_location['suggested_value'] = location.suggested_value return json_location
- def make_json_report(status, issue_map, external_issue_map, schema_url=None, data_url=None)
- 
Generate a JSON error report from a dict of errors @param status: the validation status (boolean) @param issue_map: a dict of lists of HXLValidationException objects grouped by rule hash @param external_issue_map: a dict of lists of HXLValidationException objects grouped by rule hash @param data_url: the original URL of the data, if available @param schema_url: the original URL of the schema, if available Expand source codedef make_json_report(status, issue_map, external_issue_map, schema_url=None, data_url=None): """Generate a JSON error report from a dict of errors @param status: the validation status (boolean) @param issue_map: a dict of lists of HXLValidationException objects grouped by rule hash @param external_issue_map: a dict of lists of HXLValidationException objects grouped by rule hash @param data_url: the original URL of the data, if available @param schema_url: the original URL of the schema, if available """ json_report = { "validator": "libhxl-python", "timestamp": datetime.datetime.now().isoformat(), "is_valid": status, "stats": { "info": 0, "warning": 0, "error": 0, "external": 0, "total": 0, }, "issues": [], "external_issues": [], } if schema_url is not None: json_report['schema_url'] = schema_url if data_url is not None: json_report['data_url'] = data_url # add the issue objects for rule_id, locations in issue_map.items(): json_issue = make_json_issue(rule_id, locations) json_report['stats']['total'] += len(json_issue['locations']) json_report['stats'][locations[0].rule.severity] += len(json_issue['locations']) json_report['issues'].append(json_issue) # add the external issue objects for rule_id, locations in external_issue_map.items(): json_issue = make_json_issue(rule_id, locations, is_external=True) json_report['stats']['total'] += 1 json_report['stats']['external'] += 1 json_report['external_issues'].append(json_issue) return json_report
- def make_rule_hash(rule)
- 
Make a good-enough hash for a rule. Expand source codedef make_rule_hash(rule): """Make a good-enough hash for a rule.""" s = "\r".join([str(rule.severity), str(rule.description), str(rule.tag_pattern)]) return base64.urlsafe_b64encode(hashlib.md5(s.encode('utf-8')).digest())[:8].decode('ascii')
- def schema(source=None, callback=None)
- 
Convenience function for making a schema Imported into init, and usually called as hxl.schema(source, callback). The callback, if provided, will receive a HXLValidationException object for each error @param source: something that can be used as a HXL data source @param callback: the validation callback function to use Expand source codedef schema(source=None, callback=None): """Convenience function for making a schema Imported into __init__, and usually called as hxl.schema(source, callback). The callback, if provided, will receive a HXLValidationException object for each error @param source: something that can be used as a HXL data source @param callback: the validation callback function to use """ return Schema.parse(source, callback)
- def validate(data, schema=None)
- 
Convenience function for validating HXL data. The is_valid parameter in the report will be a True/False value for the result. If you want to do anything tricky, you can pass pre-cooked hxl.data() parameters in, e.g. result = hxl.validate(hxl.data('foo.csv', allow_local=True), hxl.data('schema.csv', allow_local=True)) @param data: the data to validate (a URL or anything else accepted by hxl.data) @param schema: the schema to validate against (anything accepted by L{hxl.data}), or None (default) to use the built-in schema. @returns: a JSON validation report as documented at https://github.com/HXLStandard/hxl-proxy/wiki/Validation-reports Expand source codedef validate(data, schema=None): """Convenience function for validating HXL data. The is_valid parameter in the report will be a True/False value for the result. If you want to do anything tricky, you can pass pre-cooked hxl.data() parameters in, e.g. result = hxl.validate(hxl.data('foo.csv', allow_local=True), hxl.data('schema.csv', allow_local=True)) @param data: the data to validate (a URL or anything else accepted by hxl.data) @param schema: the schema to validate against (anything accepted by L{hxl.data}), or None (default) to use the built-in schema. @returns: a JSON validation report as documented at https://github.com/HXLStandard/hxl-proxy/wiki/Validation-reports """ issue_map = dict() external_issue_map = dict() def add_issue(issue): hash = make_rule_hash(issue.rule) if issue.is_external: external_issue_map.setdefault(hash, []).append(issue) else: issue_map.setdefault(hash, []).append(issue) status = hxl.schema(schema, callback=add_issue).validate(hxl.data(data)) schema_url = None data_url = None if hxl.datatypes.is_string(schema): schema_url = schema if hxl.datatypes.is_string(data): data_url = data return make_json_report(status, issue_map, external_issue_map, schema_url=schema_url, data_url=data_url)
Classes
- class AbstractRuleTest (callback=None)
- 
Base class for a single test inside a validation rule. Workflow (triggered by SchemaRule): - needs_scan()
- start()
- scan_row() for each row (only if needs_scan() returned True)
- scan_cell() for each non-empty matching cell (only if needs_scan() returned True)
- end_scan() (only if needs_scan() returned True)
- validate_dataset()
- validate_row() for each row
- validate_cell() for each matching non-empty cell in each row
- end()
 Set up a schema test. @param callback: a callback function to receive error reports Expand source codeclass AbstractRuleTest(object): """Base class for a single test inside a validation rule. Workflow (triggered by SchemaRule): - needs_scan() - start() - scan_row() for each row (only if needs_scan() returned True) - scan_cell() for each non-empty matching cell (only if needs_scan() returned True) - end_scan() (only if needs_scan() returned True) - validate_dataset() - validate_row() for each row - validate_cell() for each matching non-empty cell in each row - end() """ def __init__(self, callback=None): """Set up a schema test. @param callback: a callback function to receive error reports """ self.callback = callback def needs_scan(self): """Report whether this test requires a cached dataset. A cached dataset is one that can be processed more than once. It requires more memory and processing time, so return True only if absolutely necessary. If this method returns True, then the validation engine will call scan_row() for each row in the dataset, scan_cell() for each non-empty matching cell, and end_scan() before invoking any of the validate_* methods. @returns: True if the test requires a cached dataset. """ return False def start(self): """Setup code to run before validating each dataset. This code should not report errors, since it hasn't seen the data yet. """ return True def end(self): """Code to run after validating each dataset. Will report errors via the test's callback, if available. @returns: True if there are no new validation errors """ return True def scan_row(self, row, indices=None, tag_pattern=None): """Scan a row of the dataset to collect information. Will be called for each row in the dataset, but only if needs_scan() returns True. This method does not report any errors; it simply collects information for the validate_*() methods to use later. @param row: the L{hxl.model.Row} object to pre-scan. @param indices: optional pre-compiled indices for the rule to look at (in lieu of tag_pattern) @param tag_pattern: optional tag pattern for the rule to use. """ return def scan_cell(self, value, row, column): """Pre-scan a single cell to collect information. Will be called for each maching non-empty cell in each row, but only if needs_scan() returns True. This method does not report any errors; it simply collects information for the validate_*() methods to use later. @param value: the non-empty value to validate @param row: a hxl.model.Row object for location @param column: a hxl.model.Column object for location """ return def end_scan(self): """Clean-up calculations after scanning and before validation. Will be called only if needs_scan() returned True Does not report any errors """ return def validate_dataset(self, dataset, indices=None, tag_pattern=None): """Apply test at the dataset level Called before validate_row() or validate_value() Will report errors via the test's callback, if available. @param dataset: a hxl.model.Dataset object to validate @param indices: optional pre-compiled indices for columns matching tag_pattern @returns: True if there are no new validation errors. """ return True def validate_row(self, row, indices=None, tag_pattern=None): """Apply test at the row level Called for each row before validate_cell() calls. Will report errors via the test's callback, if available. @param row: a hxl.model.Row object to validate @param indices: optional pre-compiled indices for columns matching tag_pattern @returns: True if there are no new validation errors. """ return True def validate_cell(self, value, row, column): """Apply test at the cell level Called for each matching non-empty value. Will also report errors via the test's callback, if available. @param value: the non-empty value to validate @param row: a hxl.model.Row object for location @param column: a hxl.model.Column object for location @returns: True if there are no new validation errors. """ return True def get_indices(self, indices, tag_pattern, columns): """Get a set of column indices by hook or by crook, based on what we have""" if indices is not None: return indices elif tag_pattern is not None: tag_pattern = hxl.model.TagPattern.parse(tag_pattern) return hxl.model.get_column_indices([tag_pattern], columns) else: raise hxl.HXLException("Internal error: rule test requires a tag pattern or a list of indices") def report_error(self, message, row=None, column=None, value=None, suggested_value=None, scope='cell'): """Report an error from this test, if there is a callback function available.""" if self.callback: self.callback(HXLValidationException( message, value=value, row=row, column=column, suggested_value=suggested_value, scope=scope, is_external=False )) return False # for convenienceSubclasses- ConsistentDatatypesTest
- CorrelationTest
- DatatypeTest
- EnumerationTest
- NumericOutlierTest
- RangeTest
- RegexTest
- RequiredTest
- SpellingTest
- UniqueRowTest
- UniqueValueTest
- WhitespaceTest
 Methods- def end(self)
- 
Code to run after validating each dataset. Will report errors via the test's callback, if available. @returns: True if there are no new validation errors Expand source codedef end(self): """Code to run after validating each dataset. Will report errors via the test's callback, if available. @returns: True if there are no new validation errors """ return True
- def end_scan(self)
- 
Clean-up calculations after scanning and before validation. Will be called only if needs_scan() returned True Does not report any errors Expand source codedef end_scan(self): """Clean-up calculations after scanning and before validation. Will be called only if needs_scan() returned True Does not report any errors """ return
- def get_indices(self, indices, tag_pattern, columns)
- 
Get a set of column indices by hook or by crook, based on what we have Expand source codedef get_indices(self, indices, tag_pattern, columns): """Get a set of column indices by hook or by crook, based on what we have""" if indices is not None: return indices elif tag_pattern is not None: tag_pattern = hxl.model.TagPattern.parse(tag_pattern) return hxl.model.get_column_indices([tag_pattern], columns) else: raise hxl.HXLException("Internal error: rule test requires a tag pattern or a list of indices")
- def needs_scan(self)
- 
Report whether this test requires a cached dataset. A cached dataset is one that can be processed more than once. It requires more memory and processing time, so return True only if absolutely necessary. If this method returns True, then the validation engine will call scan_row() for each row in the dataset, scan_cell() for each non-empty matching cell, and end_scan() before invoking any of the validate_* methods. @returns: True if the test requires a cached dataset. Expand source codedef needs_scan(self): """Report whether this test requires a cached dataset. A cached dataset is one that can be processed more than once. It requires more memory and processing time, so return True only if absolutely necessary. If this method returns True, then the validation engine will call scan_row() for each row in the dataset, scan_cell() for each non-empty matching cell, and end_scan() before invoking any of the validate_* methods. @returns: True if the test requires a cached dataset. """ return False
- def report_error(self, message, row=None, column=None, value=None, suggested_value=None, scope='cell')
- 
Report an error from this test, if there is a callback function available. Expand source codedef report_error(self, message, row=None, column=None, value=None, suggested_value=None, scope='cell'): """Report an error from this test, if there is a callback function available.""" if self.callback: self.callback(HXLValidationException( message, value=value, row=row, column=column, suggested_value=suggested_value, scope=scope, is_external=False )) return False # for convenience
- def scan_cell(self, value, row, column)
- 
Pre-scan a single cell to collect information. Will be called for each maching non-empty cell in each row, but only if needs_scan() returns True. This method does not report any errors; it simply collects information for the validate_*() methods to use later. @param value: the non-empty value to validate @param row: a hxl.model.Row object for location @param column: a hxl.model.Column object for location Expand source codedef scan_cell(self, value, row, column): """Pre-scan a single cell to collect information. Will be called for each maching non-empty cell in each row, but only if needs_scan() returns True. This method does not report any errors; it simply collects information for the validate_*() methods to use later. @param value: the non-empty value to validate @param row: a hxl.model.Row object for location @param column: a hxl.model.Column object for location """ return
- def scan_row(self, row, indices=None, tag_pattern=None)
- 
Scan a row of the dataset to collect information. Will be called for each row in the dataset, but only if needs_scan() returns True. This method does not report any errors; it simply collects information for the validate_*() methods to use later. @param row: the L{hxl.model.Row} object to pre-scan. @param indices: optional pre-compiled indices for the rule to look at (in lieu of tag_pattern) @param tag_pattern: optional tag pattern for the rule to use. Expand source codedef scan_row(self, row, indices=None, tag_pattern=None): """Scan a row of the dataset to collect information. Will be called for each row in the dataset, but only if needs_scan() returns True. This method does not report any errors; it simply collects information for the validate_*() methods to use later. @param row: the L{hxl.model.Row} object to pre-scan. @param indices: optional pre-compiled indices for the rule to look at (in lieu of tag_pattern) @param tag_pattern: optional tag pattern for the rule to use. """ return
- def start(self)
- 
Setup code to run before validating each dataset. This code should not report errors, since it hasn't seen the data yet. Expand source codedef start(self): """Setup code to run before validating each dataset. This code should not report errors, since it hasn't seen the data yet. """ return True
- def validate_cell(self, value, row, column)
- 
Apply test at the cell level Called for each matching non-empty value. Will also report errors via the test's callback, if available. @param value: the non-empty value to validate @param row: a hxl.model.Row object for location @param column: a hxl.model.Column object for location @returns: True if there are no new validation errors. Expand source codedef validate_cell(self, value, row, column): """Apply test at the cell level Called for each matching non-empty value. Will also report errors via the test's callback, if available. @param value: the non-empty value to validate @param row: a hxl.model.Row object for location @param column: a hxl.model.Column object for location @returns: True if there are no new validation errors. """ return True
- def validate_dataset(self, dataset, indices=None, tag_pattern=None)
- 
Apply test at the dataset level Called before validate_row() or validate_value() Will report errors via the test's callback, if available. @param dataset: a hxl.model.Dataset object to validate @param indices: optional pre-compiled indices for columns matching tag_pattern @returns: True if there are no new validation errors. Expand source codedef validate_dataset(self, dataset, indices=None, tag_pattern=None): """Apply test at the dataset level Called before validate_row() or validate_value() Will report errors via the test's callback, if available. @param dataset: a hxl.model.Dataset object to validate @param indices: optional pre-compiled indices for columns matching tag_pattern @returns: True if there are no new validation errors. """ return True
- def validate_row(self, row, indices=None, tag_pattern=None)
- 
Apply test at the row level Called for each row before validate_cell() calls. Will report errors via the test's callback, if available. @param row: a hxl.model.Row object to validate @param indices: optional pre-compiled indices for columns matching tag_pattern @returns: True if there are no new validation errors. Expand source codedef validate_row(self, row, indices=None, tag_pattern=None): """Apply test at the row level Called for each row before validate_cell() calls. Will report errors via the test's callback, if available. @param row: a hxl.model.Row object to validate @param indices: optional pre-compiled indices for columns matching tag_pattern @returns: True if there are no new validation errors. """ return True
 
- class ConsistentDatatypesTest (callback=None)
- 
Check for consistent datatypes in a column. HXL: #valid_datatype+consistent Will report all but the most-common datatype as errors. Special knowledge of the #date hashtag Set up a schema test. @param callback: a callback function to receive error reports Expand source codeclass ConsistentDatatypesTest(AbstractRuleTest): """Check for consistent datatypes in a column. HXL: #valid_datatype+consistent Will report all but the most-common datatype as errors. Special knowledge of the #date hashtag """ def needs_scan(self): """We want to prescan the dataset""" return True def start(self): self.datatype_map = dict() def scan_cell(self, value, row, column): datatype = self.guess_type(value, column) tagspec = column.get_display_tag(sort_attributes=True) # FIXME # keep track of how often the type appeared if not tagspec in self.datatype_map: self.datatype_map[tagspec] = {} if not datatype in self.datatype_map[tagspec]: self.datatype_map[tagspec][datatype] = 0 self.datatype_map[tagspec][datatype] += 1 def end_scan(self): """Reduce the datatype_map to the most-common type for each tagspec""" for tagspec, datatypes in self.datatype_map.items(): max_type = None max_count = None for datatype, count in datatypes.items(): if max_count is None or count > max_count: max_count = count max_type = datatype self.datatype_map[tagspec] = max_type def validate_cell(self, value, row, column): """Keep track of each different datatype Error reporting happens in the end() method, once we know which type is most common. @returns: always True """ actual_datatype = self.guess_type(value, column) tagspec = column.get_display_tag(sort_attributes=True) # FIXME expected_datatype = self.datatype_map.get(tagspec) if actual_datatype == expected_datatype: return True else: message = "Inconsistent data types: expected {} but found {}".format(expected_datatype, actual_datatype) return self.report_error( message, value=value, row=row, column=column ) def guess_type(self, value, column): """Guess the type of a value""" if column.tag == '#date' and hxl.datatypes.is_date(value): return 'date' elif hxl.datatypes.is_number(value): return 'number' else: return 'text'AncestorsMethods- def end_scan(self)
- 
Reduce the datatype_map to the most-common type for each tagspec Expand source codedef end_scan(self): """Reduce the datatype_map to the most-common type for each tagspec""" for tagspec, datatypes in self.datatype_map.items(): max_type = None max_count = None for datatype, count in datatypes.items(): if max_count is None or count > max_count: max_count = count max_type = datatype self.datatype_map[tagspec] = max_type
- def guess_type(self, value, column)
- 
Guess the type of a value Expand source codedef guess_type(self, value, column): """Guess the type of a value""" if column.tag == '#date' and hxl.datatypes.is_date(value): return 'date' elif hxl.datatypes.is_number(value): return 'number' else: return 'text'
- def needs_scan(self)
- 
We want to prescan the dataset Expand source codedef needs_scan(self): """We want to prescan the dataset""" return True
- def validate_cell(self, value, row, column)
- 
Keep track of each different datatype Error reporting happens in the end() method, once we know which type is most common. @returns: always True Expand source codedef validate_cell(self, value, row, column): """Keep track of each different datatype Error reporting happens in the end() method, once we know which type is most common. @returns: always True """ actual_datatype = self.guess_type(value, column) tagspec = column.get_display_tag(sort_attributes=True) # FIXME expected_datatype = self.datatype_map.get(tagspec) if actual_datatype == expected_datatype: return True else: message = "Inconsistent data types: expected {} but found {}".format(expected_datatype, actual_datatype) return self.report_error( message, value=value, row=row, column=column )
 Inherited members
- class CorrelationTest (tag_patterns)
- 
Test for correlations with other values valid_correlationSupply a list of tag patterns, and report any outliers that don't correlate with those columns. TODO: this might be more efficient with pre-scanning Constructor @param tag_patterns: a list of tag patterns for the correlation Expand source codeclass CorrelationTest(AbstractRuleTest): """Test for correlations with other values #valid_correlation Supply a list of tag patterns, and report any outliers that don't correlate with those columns. TODO: this might be more efficient with pre-scanning """ def __init__(self, tag_patterns): """Constructor @param tag_patterns: a list of tag patterns for the correlation """ super().__init__() self.tag_patterns = hxl.model.TagPattern.parse_list(tag_patterns) def start(self): """Build an empty correlation map""" self.correlation_map = dict() self.correlation_indices = None def end(self): """All the error reporting happens here. We don't know the most-common correlations for each key until the end. Use the most-common value for each correlation key as the suggested value for the others. """ status = True for tagspec, correlations in self.correlation_map.items(): for key, value_maps in correlations.items(): if len(value_maps) > 1: status = False value_maps = sorted( value_maps.items(), key=lambda e: len(e[1]), reverse=True ) suggested_value = value_maps[0][0] for value_map in value_maps[1:]: value = value_map[0] for location in value_map[1]: self.report_error( "Unexpected value", value=value, row=location[0], column=location[1], suggested_value=suggested_value ) self.correlation_map = None return status def validate_row(self, row, indices=None, tag_pattern=None): """Row validation always succeeds We use this method to capture the correlation keys, so that we can report on them in the end() method. """ # will cache, so that we don't repeat the calculation indices = self.get_indices(indices, tag_pattern, row.columns) # calculate the correlation-column indices only once as well if self.correlation_indices is None: self.correlation_indices = hxl.model.get_column_indices(self.tag_patterns, row.columns) # Make the correlation key key = row.key(indices=self.correlation_indices) # Record the locations # key -> value -> location for i in indices: tagspec = row.columns[i].get_display_tag(sort_attributes=True) if i < len(row.values) and not hxl.datatypes.is_empty(row.values[i]): value = row.values[i] column = row.columns[i] self.correlation_map.setdefault(tagspec, {}).setdefault(key, {}).setdefault(value, []).append((row, column,)) # always succeed return TrueAncestorsMethods- def end(self)
- 
All the error reporting happens here. We don't know the most-common correlations for each key until the end. Use the most-common value for each correlation key as the suggested value for the others. Expand source codedef end(self): """All the error reporting happens here. We don't know the most-common correlations for each key until the end. Use the most-common value for each correlation key as the suggested value for the others. """ status = True for tagspec, correlations in self.correlation_map.items(): for key, value_maps in correlations.items(): if len(value_maps) > 1: status = False value_maps = sorted( value_maps.items(), key=lambda e: len(e[1]), reverse=True ) suggested_value = value_maps[0][0] for value_map in value_maps[1:]: value = value_map[0] for location in value_map[1]: self.report_error( "Unexpected value", value=value, row=location[0], column=location[1], suggested_value=suggested_value ) self.correlation_map = None return status
- def start(self)
- 
Build an empty correlation map Expand source codedef start(self): """Build an empty correlation map""" self.correlation_map = dict() self.correlation_indices = None
- def validate_row(self, row, indices=None, tag_pattern=None)
- 
Row validation always succeeds We use this method to capture the correlation keys, so that we can report on them in the end() method. Expand source codedef validate_row(self, row, indices=None, tag_pattern=None): """Row validation always succeeds We use this method to capture the correlation keys, so that we can report on them in the end() method. """ # will cache, so that we don't repeat the calculation indices = self.get_indices(indices, tag_pattern, row.columns) # calculate the correlation-column indices only once as well if self.correlation_indices is None: self.correlation_indices = hxl.model.get_column_indices(self.tag_patterns, row.columns) # Make the correlation key key = row.key(indices=self.correlation_indices) # Record the locations # key -> value -> location for i in indices: tagspec = row.columns[i].get_display_tag(sort_attributes=True) if i < len(row.values) and not hxl.datatypes.is_empty(row.values[i]): value = row.values[i] column = row.columns[i] self.correlation_map.setdefault(tagspec, {}).setdefault(key, {}).setdefault(value, []).append((row, column,)) # always succeed return True
 Inherited members
- class DatatypeTest (datatype)
- 
Test for a specified datatype HXL schema: #valid_datatype-consistent See also ConsistentDatatypeTest, which infers the most-common datatype. Constructor @param datatype: a string specifying the datatype (e.g. "number") Expand source codeclass DatatypeTest(AbstractRuleTest): """Test for a specified datatype HXL schema: #valid_datatype-consistent See also ConsistentDatatypeTest, which infers the most-common datatype. """ # allowed datatypes DATATYPES = ('text', 'number', 'url', 'email', 'phone', 'date',) def __init__(self, datatype): """Constructor @param datatype: a string specifying the datatype (e.g. "number") """ super().__init__() # make sure we recognise the datatype datatype = hxl.datatypes.normalise_string(datatype) if datatype in DatatypeTest.DATATYPES: self.datatype = datatype else: raise hxl.HXLException("Unsupported datatype: {}".format(datatype)) def validate_cell(self, value, row, column): """Validate datatypes on the individual cell level""" status = True def report(message): return self.report_error( message, value=value, row=row, column=column ) if self.datatype == 'number': if not hxl.datatypes.is_number(value): status = report("Expected a number") elif self.datatype == 'url': pieces = urllib.parse.urlparse(value) if not (pieces.scheme and pieces.netloc): status = report("Expected a URL") elif self.datatype == 'email': if not re.match(r'^[^@]+@[^@]+$', value): status = report("Expected an email address") elif self.datatype == 'phone': if not re.match(r'^\+?[0-9xX()\s-]{5,}$', value): status= report("Expected a phone number") elif self.datatype == 'date': if not hxl.datatypes.is_date(value): status = report("Expected a date") return statusAncestorsClass variables- var DATATYPES
 Methods- def validate_cell(self, value, row, column)
- 
Validate datatypes on the individual cell level Expand source codedef validate_cell(self, value, row, column): """Validate datatypes on the individual cell level""" status = True def report(message): return self.report_error( message, value=value, row=row, column=column ) if self.datatype == 'number': if not hxl.datatypes.is_number(value): status = report("Expected a number") elif self.datatype == 'url': pieces = urllib.parse.urlparse(value) if not (pieces.scheme and pieces.netloc): status = report("Expected a URL") elif self.datatype == 'email': if not re.match(r'^[^@]+@[^@]+$', value): status = report("Expected an email address") elif self.datatype == 'phone': if not re.match(r'^\+?[0-9xX()\s-]{5,}$', value): status= report("Expected a phone number") elif self.datatype == 'date': if not hxl.datatypes.is_date(value): status = report("Expected a date") return status
 Inherited members
- class EnumerationTest (allowed_values, case_sensitive=False)
- 
Test against a list of enumerated values HXL schema: #valid_value+list #valid_value+url #valid_value+case This test can also hold an extra error to report if there was a problem e.g. reading the values externally. Constructor @param allowed_values: sequence of allowed values @param case_sensitive: if True, make comparisons case-sensitive Expand source codeclass EnumerationTest(AbstractRuleTest): """Test against a list of enumerated values HXL schema: #valid_value+list #valid_value+url #valid_value+case This test can also hold an extra error to report if there was a problem e.g. reading the values externally. """ def __init__(self, allowed_values, case_sensitive=False): """Constructor @param allowed_values: sequence of allowed values @param case_sensitive: if True, make comparisons case-sensitive """ super().__init__() self.case_sensitive = case_sensitive self.setup_tables(allowed_values) def setup_tables(self, allowed_values): self.suggested_value_cache = dict() self.cooked_value_set = set() if self.case_sensitive: for raw_value in allowed_values: raw_value = hxl.datatypes.normalise_space(raw_value) self.cooked_value_set.add(raw_value) else: self.raw_value_map = dict() for raw_value in allowed_values: raw_value = hxl.datatypes.normalise_space(raw_value) cooked_value = hxl.datatypes.normalise_string(raw_value) self.cooked_value_set.add(cooked_value) self.raw_value_map[cooked_value] = raw_value def validate_cell(self, value, row, column): if self.case_sensitive: cooked_value = hxl.datatypes.normalise_space(value) else: cooked_value = hxl.datatypes.normalise_string(value) if cooked_value in self.cooked_value_set: return True else: suggested_value = self.get_suggested_value(cooked_value) return self.report_error( "Value not allowed", value=value, row=row, column=column, suggested_value=suggested_value ) def get_suggested_value(self, value): # try the cache first suggested_value = self.suggested_value_cache.get(value, False) # False to allow for None values if suggested_value is False: suggested_value = find_closest_match(value, self.cooked_value_set) if suggested_value is not None and not self.case_sensitive: # we need the original character case if case-insensitive suggested_value = self.raw_value_map[suggested_value] self.suggested_value_cache[value] = suggested_value return suggested_valueAncestorsMethods- def get_suggested_value(self, value)
- 
Expand source codedef get_suggested_value(self, value): # try the cache first suggested_value = self.suggested_value_cache.get(value, False) # False to allow for None values if suggested_value is False: suggested_value = find_closest_match(value, self.cooked_value_set) if suggested_value is not None and not self.case_sensitive: # we need the original character case if case-insensitive suggested_value = self.raw_value_map[suggested_value] self.suggested_value_cache[value] = suggested_value return suggested_value
- def setup_tables(self, allowed_values)
- 
Expand source codedef setup_tables(self, allowed_values): self.suggested_value_cache = dict() self.cooked_value_set = set() if self.case_sensitive: for raw_value in allowed_values: raw_value = hxl.datatypes.normalise_space(raw_value) self.cooked_value_set.add(raw_value) else: self.raw_value_map = dict() for raw_value in allowed_values: raw_value = hxl.datatypes.normalise_space(raw_value) cooked_value = hxl.datatypes.normalise_string(raw_value) self.cooked_value_set.add(cooked_value) self.raw_value_map[cooked_value] = raw_value
 Inherited members
- class HXLValidationException (message, rule=None, value=None, row=None, column=None, suggested_value=None, scope='cell', is_external=False)
- 
Data structure to hold a HXL validation error. Normally, this exception isn't thrown, but is passed as a parameter to callbacks via Schema, SchemaRule, and classes extending AbstractRuleTest. Construct a new validation error report. @param message: the text message for the error @param rule: the rule associated with the error (it may have a more-general descriptive message) @param value: the value that triggered the error, if available @param row: the hxl.model.Row object associated with the error, if any @param column: the hxl.model.Column object associated with the error, if any @param suggested_value: the suggested replacement value, if known @param scope: the error scope (dataset, column, row, or cell) @param is_external: if True, the error is external to the data itself Expand source codeclass HXLValidationException(hxl.HXLException): """Data structure to hold a HXL validation error. Normally, this exception isn't thrown, but is passed as a parameter to callbacks via Schema, SchemaRule, and classes extending AbstractRuleTest. """ SCOPES = ('dataset', 'column', 'row', 'cell',) """Allowed values for the error scope""" def __init__(self, message, rule=None, value=None, row=None, column=None, suggested_value=None, scope='cell', is_external=False): """Construct a new validation error report. @param message: the text message for the error @param rule: the rule associated with the error (it may have a more-general descriptive message) @param value: the value that triggered the error, if available @param row: the hxl.model.Row object associated with the error, if any @param column: the hxl.model.Column object associated with the error, if any @param suggested_value: the suggested replacement value, if known @param scope: the error scope (dataset, column, row, or cell) @param is_external: if True, the error is external to the data itself """ super().__init__(message) self.rule = rule self.value = value self.row = row self.column = column self.suggested_value = suggested_value self.is_external = is_external scope = hxl.datatypes.normalise_string(scope) if scope in HXLValidationException.SCOPES: self.scope = scope else: raise hxl.HXLException("Unrecognised validation-error scope: {}".format(scope)) def __str__(self): """Get a string rendition of this error.""" s = '<HXLValidationException ' if self.message: s += self.message + ' ' if self.rule: if self.rule.tag_pattern: if self.value: s += '{}={} '.format(str(self.rule.tag_pattern), str(self.value)) else: s += '{} '.format(str(self.rule.tag_pattern)) if self.message: s += '- {}'.format(self.message) return sAncestors- HXLException
- builtins.Exception
- builtins.BaseException
 Class variables- var SCOPES
- 
Allowed values for the error scope 
 Inherited members
- class NumericOutlierTest (callback=None)
- 
Detect outliers among matching values Will skip any tagset with a coefficient of variation > 1.0 Set up a schema test. @param callback: a callback function to receive error reports Expand source codeclass NumericOutlierTest(AbstractRuleTest): """Detect outliers among matching values Will skip any tagset with a coefficient of variation > 1.0 """ def needs_scan(self): return True def start(self): self.standard_deviations = dict() self.mean_values = dict() self.variation_coefficients = dict() self.values = dict() def scan_cell(self, value, row, column): tagspec = column.get_display_tag(sort_attributes=True) # FIXME try: num = hxl.datatypes.normalise_number(value) if not tagspec in self.values: self.values[tagspec] = list() self.values[tagspec].append(num) except: # not a number, so ignore pass def end_scan(self): for tagspec in self.values: values = self.values[tagspec] # if the list is long enough, remove the min and max values if len(values) >= 10: # 10 is our cutoff for removing lowest and highest values values = sorted(values)[1:-1] # now calculate the standard deviation of the remaining values self.mean_values[tagspec] = sum(values) / len(values) self.standard_deviations[tagspec] = math.sqrt( sum(map(lambda n: (n-self.mean_values[tagspec])*(n-self.mean_values[tagspec]), values)) / len(values) ) # calculate the coefficient of variance, which we'll use as a cutoff if self.mean_values[tagspec]: self.variation_coefficients[tagspec] = self.standard_deviations[tagspec] / self.mean_values[tagspec] else: self.variation_coefficients[tagspec] = 0 # free some memory del self.values def validate_cell(self, value, row, column): tagspec = column.get_display_tag(sort_attributes=True) # FIXME # don't bother if the data is highly variable if self.variation_coefficients.get(tagspec, 0.0) > 1.0: return True # try numeric validation try: num = hxl.datatypes.normalise_number(value) distance = abs(num - self.mean_values[tagspec]) if distance > self.standard_deviations[tagspec] * 3: self.report_error( "Possible numeric outlier", value=value, row=row, column=column ) return False except: # not a number, so ignore pass return TrueAncestorsInherited members
- class RangeTest (min_value=None, max_value=None)
- 
Test for a range of numbers or strings HXL schema: #valid_value+min and #valid_value+max This class will try to determine whether to use date sorting, numeric sorting, or lexical sorting to determine what is within a range. Ranges are always case-insensitive Constructor @param min_value: the minimum allowed value, or None for no minimum @param max_value: the maximum allowed value, or None for no maximum Expand source codeclass RangeTest(AbstractRuleTest): """Test for a range of numbers or strings HXL schema: #valid_value+min and #valid_value+max This class will try to determine whether to use date sorting, numeric sorting, or lexical sorting to determine what is within a range. Ranges are always case-insensitive """ def __init__(self, min_value=None, max_value=None): """Constructor @param min_value: the minimum allowed value, or None for no minimum @param max_value: the maximum allowed value, or None for no maximum """ super().__init__() # normalise strings if min_value is not None: self.min_value = hxl.datatypes.normalise_string(min_value) else: self.min_value = None if max_value is not None: self.max_value = hxl.datatypes.normalise_string(max_value) else: self.max_value = None # precompile numbers and dates for efficiency try: self.min_num = hxl.datatypes.normalise_number(min_value) except ValueError: self.min_num = None try: self.max_num = hxl.datatypes.normalise_number(max_value) except ValueError: self.max_num = None try: self.min_date = hxl.datatypes.normalise_date(min_value) except ValueError: self.min_date = None try: self.max_date = hxl.datatypes.normalise_date(max_value) except ValueError: self.max_date = None def validate_cell(self, value, row, column): """Test that a value is >= min_value and/or <= max_value Includes special handling for numbers and dates. """ def report(message): return self.report_error( message, value=value, row=row, column=column ) # try as a date if column.tag == '#date' and (self.min_date is not None or self.max_date is not None): try: date_value = hxl.datatypes.normalise_date(value) if self.min_date is not None and date_value < self.min_date: return report('Date must not be before {}'.format(self.min_date)) elif self.max_date is not None and date_value > self.max_date: return report('Date must not be after {}'.format(self.max_date)) else: return True except ValueError: pass # OK # try as a number if self.min_num is not None or self.max_num is not None: try: num_value = hxl.datatypes.normalise_number(value) if self.min_num is not None and num_value < self.min_num: return report('Value must not be less than {}'.format(self.min_num)) elif self.max_num is not None and num_value > self.max_num: return report('Value must not be more than {}'.format(self.max_num)) else: return True except ValueError: pass # OK # try as a case-/whitespace-normalised string norm_value = hxl.datatypes.normalise_string(value) if self.min_value is not None and norm_value < self.min_value: return report('Value must not come before {}'.format(self.min_value)) elif self.max_value is not None and norm_value > self.max_value: return report('Value must not come after {}'.format(self.max_value)) else: return TrueAncestorsMethods- def validate_cell(self, value, row, column)
- 
Test that a value is >= min_value and/or <= max_value Includes special handling for numbers and dates. Expand source codedef validate_cell(self, value, row, column): """Test that a value is >= min_value and/or <= max_value Includes special handling for numbers and dates. """ def report(message): return self.report_error( message, value=value, row=row, column=column ) # try as a date if column.tag == '#date' and (self.min_date is not None or self.max_date is not None): try: date_value = hxl.datatypes.normalise_date(value) if self.min_date is not None and date_value < self.min_date: return report('Date must not be before {}'.format(self.min_date)) elif self.max_date is not None and date_value > self.max_date: return report('Date must not be after {}'.format(self.max_date)) else: return True except ValueError: pass # OK # try as a number if self.min_num is not None or self.max_num is not None: try: num_value = hxl.datatypes.normalise_number(value) if self.min_num is not None and num_value < self.min_num: return report('Value must not be less than {}'.format(self.min_num)) elif self.max_num is not None and num_value > self.max_num: return report('Value must not be more than {}'.format(self.max_num)) else: return True except ValueError: pass # OK # try as a case-/whitespace-normalised string norm_value = hxl.datatypes.normalise_string(value) if self.min_value is not None and norm_value < self.min_value: return report('Value must not come before {}'.format(self.min_value)) elif self.max_value is not None and norm_value > self.max_value: return report('Value must not come after {}'.format(self.max_value)) else: return True
 Inherited members
- class RegexTest (regex, case_sensitive=True)
- 
Test that non-empty values match a regular expression HXL schema: #valid_value+regex The regex is unanchored, so use '^' or '$' to anchor if needed Whitespace is not normalised before matching Constructor @param regex: the regular expression to test against @param case_sensitive: if True (default), matches are case-sensitive Expand source codeclass RegexTest(AbstractRuleTest): """Test that non-empty values match a regular expression HXL schema: #valid_value+regex The regex is unanchored, so use '^' or '$' to anchor if needed Whitespace is not normalised before matching """ def __init__(self, regex, case_sensitive=True): """Constructor @param regex: the regular expression to test against @param case_sensitive: if True (default), matches are case-sensitive """ super().__init__() self.regex_text = str(regex) if case_sensitive: self.regex = re.compile(regex) else: self.regex = re.compile(regex, flags=re.IGNORECASE) def validate_cell(self, value, row, column): """Match value (including whitespace) against the regular expression""" if self.regex.search(value): return True else: return self.report_error( 'Should match regular expression /{}/'.format(self.regex_text), value=value, row=row, column=column )AncestorsMethods- def validate_cell(self, value, row, column)
- 
Match value (including whitespace) against the regular expression Expand source codedef validate_cell(self, value, row, column): """Match value (including whitespace) against the regular expression""" if self.regex.search(value): return True else: return self.report_error( 'Should match regular expression /{}/'.format(self.regex_text), value=value, row=row, column=column )
 Inherited members
- class RequiredTest (min_occurs=None, max_occurs=None)
- 
Test min/max occurrence HXL schema: #valid_required If the columns don't exist at all, report only a single error. Otherwise, report an error for each row where the test fails. Constructor @param min: minimum occurrence required (or None) @param max: maximum occurrence allowed (or None) Expand source codeclass RequiredTest(AbstractRuleTest): """Test min/max occurrence HXL schema: #valid_required If the columns don't exist at all, report only a single error. Otherwise, report an error for each row where the test fails. """ def __init__(self, min_occurs=None, max_occurs=None): """Constructor @param min: minimum occurrence required (or None) @param max: maximum occurrence allowed (or None) """ super().__init__() self.min_occurs = min_occurs self.max_occurs = max_occurs self.test_rows = True def start(self): self.test_rows = True def validate_dataset(self, dataset, indices=None, tag_pattern=None): """Verify that we have enough matching columns to satisfy the test""" status = True indices = self.get_indices(indices, tag_pattern, dataset.columns) if self.min_occurs is not None and len(indices) < self.min_occurs: self.test_rows = False # no point testing individual rows status = self.report_error( "Expected at least {} matching column(s)".format(self.min_occurs), scope='dataset' ) return status def validate_row(self, row, indices=None, tag_pattern=None): """Check the number of non-empty occurrences in a row.""" if not self.test_rows: # skip if there aren't enough columns return status = True indices = self.get_indices(indices, tag_pattern, row.columns) non_empty_count = 0 first_empty_column = None last_nonempty_column = None # iterate through all values in matching columns for i in indices: if i >= len(row.values) or hxl.datatypes.is_empty(row.values[i]): if first_empty_column is None: first_empty_column = row.columns[i] else: non_empty_count += 1 last_nonempty_column = row.columns[i] if self.min_occurs is not None and non_empty_count < self.min_occurs: status = self.report_error( "Expected at least {} matching non-empty value(s)".format(self.min_occurs), row=row, column=first_empty_column, scope='row' ) if self.max_occurs is not None and non_empty_count > self.max_occurs: status = self.report_error( "Expected at most {} matching non-empty value(s)".format(self.max_occurs), row=row, column=last_nonempty_column, scope='row' ) return statusAncestorsMethods- def validate_dataset(self, dataset, indices=None, tag_pattern=None)
- 
Verify that we have enough matching columns to satisfy the test Expand source codedef validate_dataset(self, dataset, indices=None, tag_pattern=None): """Verify that we have enough matching columns to satisfy the test""" status = True indices = self.get_indices(indices, tag_pattern, dataset.columns) if self.min_occurs is not None and len(indices) < self.min_occurs: self.test_rows = False # no point testing individual rows status = self.report_error( "Expected at least {} matching column(s)".format(self.min_occurs), scope='dataset' ) return status
- def validate_row(self, row, indices=None, tag_pattern=None)
- 
Check the number of non-empty occurrences in a row. Expand source codedef validate_row(self, row, indices=None, tag_pattern=None): """Check the number of non-empty occurrences in a row.""" if not self.test_rows: # skip if there aren't enough columns return status = True indices = self.get_indices(indices, tag_pattern, row.columns) non_empty_count = 0 first_empty_column = None last_nonempty_column = None # iterate through all values in matching columns for i in indices: if i >= len(row.values) or hxl.datatypes.is_empty(row.values[i]): if first_empty_column is None: first_empty_column = row.columns[i] else: non_empty_count += 1 last_nonempty_column = row.columns[i] if self.min_occurs is not None and non_empty_count < self.min_occurs: status = self.report_error( "Expected at least {} matching non-empty value(s)".format(self.min_occurs), row=row, column=first_empty_column, scope='row' ) if self.max_occurs is not None and non_empty_count > self.max_occurs: status = self.report_error( "Expected at most {} matching non-empty value(s)".format(self.max_occurs), row=row, column=last_nonempty_column, scope='row' ) return status
 Inherited members
- class Schema (callback=None)
- 
Schema against which to validate a HXL document. Consists of a sequence of L{SchemaRule} objects to apply to the data. The validate() method triggers the following: - start()
- validate_dataset()
- validate_row() for each row
- validate_cell() for each matching non-empty cell in each row
- end()
 Add new rules with schema.rules.append(rule)Constructor @param callback: a callback function to receive HXLValidationException objects as error reports Expand source codeclass Schema(object): """Schema against which to validate a HXL document. Consists of a sequence of L{SchemaRule} objects to apply to the data. The validate() method triggers the following: - start() - validate_dataset() - validate_row() for each row - validate_cell() for each matching non-empty cell in each row - end() Add new rules with schema.rules.append(rule) """ def __init__(self, callback=None): """Constructor @param callback: a callback function to receive HXLValidationException objects as error reports """ self.rules = [] """Rules making up this schema""" self.callback = callback """Callback function to receive error reports""" def validate(self, source): """Execute the main validation workflow. @param source: the hxl.model.Dataset to validate """ status = True # all is well at the beginning needs_scan = False # assume we don't need a pre-scan # do we need a cached, in-memory dataset? for rule in self.rules: if rule.needs_scan(): needs_scan = True if not source.is_cached: source = source.cache() break # initial setup self.start() # pre-scan if needed if needs_scan: for row in source: self.scan_row(row) self.end_scan() # dataset-level validations if not self.validate_dataset(source): status = False # row-level validations # (will also include cell-level validations) for row in source: if not self.validate_row(row): status = False # finalisation if not self.end(): status = False return status def start(self): """Initialise the validation run""" def rule_callback(e): """Relay rule callbacks""" if self.callback: self.callback(e) for rule in self.rules: rule.callback = rule_callback rule.start() def end(self): """Terminate the validation run""" status = True for rule in self.rules: if not rule.end(): status = False return status def scan_row(self, row): """Pre-scan a row, only for rules that require it.""" for rule in self.rules: if rule.needs_scan(): rule.scan_row(row) def end_scan(self): """End pre-scan, for rules that require it.""" for rule in self.rules: if rule.needs_scan(): rule.end_scan() def validate_dataset(self, dataset): """Validate just at the dataset level @param dataset: the hxl.model.Dataset object to validate """ status = True for rule in self.rules: if not rule.validate_dataset(dataset): status = False return status def validate_row(self, row): """Validate at the row and cell levels. Each rule will handle cell-level validation on its own, because it knows what columns to look at. @param row: the row to validate """ status = True for rule in self.rules: if not rule.validate_row(row): status = False return status def __str__(self): """String representation of a schema (for debugging)""" s = "<HXL schema\n" for rule in self.rules: s += " " + str(rule) + "\n" s += ">" return s @staticmethod def parse(source=None, callback=None): """ Load a HXL schema from the provided input stream, or load default schema. @param source: HXL data source for the scheme (e.g. a HXLReader or filter); defaults to the built-in schema @param callback: a callback function for reporting errors (receives a HXLValidationException) """ # Catch special cases if source is None: # Use the built-in default schema and recurse path = os.path.join(os.path.dirname(__file__), 'hxl-default-schema.json'); with hxl.data(path, hxl.InputOptions(allow_local=True)) as source: return Schema.parse(source, callback) if isinstance(source, Schema): # Already a schema; set the callback and return it source.callback = callback return source if not isinstance(source, hxl.model.Dataset): # Not already a dataset, so wrap it and recurse with hxl.data(source) as source: return Schema.parse(source, callback) # Main parsing schema = Schema(callback=callback) def parse_type(type): if type: type = type.lower() type = re.sub(r'[^a-z_-]', '', type) # normalise if type in SchemaRule.DATATYPES: return type else: return None def to_int(s): if s: return int(s) else: return None def to_float(s): if s: return float(s) else: return None def to_boolean(s): if not s or s.lower() in ['0', 'n', 'no', 'f', 'false']: return False elif s.lower() in ['y', 'yes', 't', 'true']: return True else: raise hxl.HXLException('Unrecognised true/false value: {}'.format(s)) for row in source: tags = row.get('#valid_tag') if tags: tag_patterns = hxl.model.TagPattern.parse_list(tags) for tag_pattern in tag_patterns: rule = SchemaRule(tag_pattern) rule.severity = row.get('#valid_severity') or 'error' rule.description = row.get('#description') # for later use case_sensitive = to_boolean(row.get('#valid_value+case')) if to_boolean(row.get('#valid_required-min-max')): rule.tests.append(RequiredTest(min_occurs=1, max_occurs=None)) min_occurs = to_int(row.get('#valid_required+min')) max_occurs = to_int(row.get('#valid_required+max')) if min_occurs is not None or max_occurs is not None: rule.tests.append(RequiredTest(min_occurs=min_occurs, max_occurs=max_occurs)) datatype = row.get('#valid_datatype-consistent') if datatype is not None: rule.tests.append(DatatypeTest(datatype)) min_value = row.get('#valid_value+min') max_value = row.get('#valid_value+max') if min_value is not None or max_value is not None: rule.tests.append(RangeTest(min_value=min_value, max_value=max_value)) if to_boolean(row.get('#valid_value+whitespace')): rule.tests.append(WhitespaceTest()) regex = row.get('#valid_value+regex') if regex is not None: rule.tests.append(RegexTest(regex, case_sensitive)) if to_boolean(row.get('#valid_value+spelling')): rule.tests.append(SpellingTest(case_sensitive=case_sensitive)) if to_boolean(row.get('#valid_unique-key')): rule.tests.append(UniqueValueTest()) key = row.get('#valid_unique+key') if hxl.datatypes.is_truthy(key): # could be problematic if there's even a hashtag like #true or #yes rule.tests.append(UniqueRowTest()) elif not hxl.datatypes.is_empty(key): rule.tests.append(UniqueRowTest(key)) correlations = row.get('#valid_correlation') if not hxl.datatypes.is_empty(correlations): rule.tests.append(CorrelationTest(correlations)) if to_boolean(row.get('#valid_datatype+consistent')): rule.tests.append(ConsistentDatatypesTest()) if to_boolean(row.get('#valid_value+outliers')): rule.tests.append(NumericOutlierTest()) l = row.get('#valid_value+list') if not hxl.datatypes.is_empty(l): allowed_values = re.split(r'\s*\|\s*', l) if len(allowed_values) > 0: rule.tests.append(EnumerationTest(allowed_values, case_sensitive)) url = row.get('#valid_value+url') if not hxl.datatypes.is_empty(url): # default the target tag to the #valid_tag target_tag = row.get('#valid_value+target_tag', default=tag_pattern) try: # read the values from an external dataset source = hxl.data(url) allowed_values = source.get_value_set(row.get('#valid_value+target_tag')) if len(allowed_values) > 0: rule.tests.append(EnumerationTest(allowed_values, case_sensitive)) except BaseException as error: # don't add the test to the rule # do add an error about loading the values rule.external_errors.append(HXLValidationException( 'Error loading allowed values from {}: {}'.format(url, error.args[0]), scope='dataset', rule=rule, is_external=True )) schema.rules.append(rule) return schemaStatic methods- def parse(source=None, callback=None)
- 
Load a HXL schema from the provided input stream, or load default schema. @param source: HXL data source for the scheme (e.g. a HXLReader or filter); defaults to the built-in schema @param callback: a callback function for reporting errors (receives a HXLValidationException) Expand source code@staticmethod def parse(source=None, callback=None): """ Load a HXL schema from the provided input stream, or load default schema. @param source: HXL data source for the scheme (e.g. a HXLReader or filter); defaults to the built-in schema @param callback: a callback function for reporting errors (receives a HXLValidationException) """ # Catch special cases if source is None: # Use the built-in default schema and recurse path = os.path.join(os.path.dirname(__file__), 'hxl-default-schema.json'); with hxl.data(path, hxl.InputOptions(allow_local=True)) as source: return Schema.parse(source, callback) if isinstance(source, Schema): # Already a schema; set the callback and return it source.callback = callback return source if not isinstance(source, hxl.model.Dataset): # Not already a dataset, so wrap it and recurse with hxl.data(source) as source: return Schema.parse(source, callback) # Main parsing schema = Schema(callback=callback) def parse_type(type): if type: type = type.lower() type = re.sub(r'[^a-z_-]', '', type) # normalise if type in SchemaRule.DATATYPES: return type else: return None def to_int(s): if s: return int(s) else: return None def to_float(s): if s: return float(s) else: return None def to_boolean(s): if not s or s.lower() in ['0', 'n', 'no', 'f', 'false']: return False elif s.lower() in ['y', 'yes', 't', 'true']: return True else: raise hxl.HXLException('Unrecognised true/false value: {}'.format(s)) for row in source: tags = row.get('#valid_tag') if tags: tag_patterns = hxl.model.TagPattern.parse_list(tags) for tag_pattern in tag_patterns: rule = SchemaRule(tag_pattern) rule.severity = row.get('#valid_severity') or 'error' rule.description = row.get('#description') # for later use case_sensitive = to_boolean(row.get('#valid_value+case')) if to_boolean(row.get('#valid_required-min-max')): rule.tests.append(RequiredTest(min_occurs=1, max_occurs=None)) min_occurs = to_int(row.get('#valid_required+min')) max_occurs = to_int(row.get('#valid_required+max')) if min_occurs is not None or max_occurs is not None: rule.tests.append(RequiredTest(min_occurs=min_occurs, max_occurs=max_occurs)) datatype = row.get('#valid_datatype-consistent') if datatype is not None: rule.tests.append(DatatypeTest(datatype)) min_value = row.get('#valid_value+min') max_value = row.get('#valid_value+max') if min_value is not None or max_value is not None: rule.tests.append(RangeTest(min_value=min_value, max_value=max_value)) if to_boolean(row.get('#valid_value+whitespace')): rule.tests.append(WhitespaceTest()) regex = row.get('#valid_value+regex') if regex is not None: rule.tests.append(RegexTest(regex, case_sensitive)) if to_boolean(row.get('#valid_value+spelling')): rule.tests.append(SpellingTest(case_sensitive=case_sensitive)) if to_boolean(row.get('#valid_unique-key')): rule.tests.append(UniqueValueTest()) key = row.get('#valid_unique+key') if hxl.datatypes.is_truthy(key): # could be problematic if there's even a hashtag like #true or #yes rule.tests.append(UniqueRowTest()) elif not hxl.datatypes.is_empty(key): rule.tests.append(UniqueRowTest(key)) correlations = row.get('#valid_correlation') if not hxl.datatypes.is_empty(correlations): rule.tests.append(CorrelationTest(correlations)) if to_boolean(row.get('#valid_datatype+consistent')): rule.tests.append(ConsistentDatatypesTest()) if to_boolean(row.get('#valid_value+outliers')): rule.tests.append(NumericOutlierTest()) l = row.get('#valid_value+list') if not hxl.datatypes.is_empty(l): allowed_values = re.split(r'\s*\|\s*', l) if len(allowed_values) > 0: rule.tests.append(EnumerationTest(allowed_values, case_sensitive)) url = row.get('#valid_value+url') if not hxl.datatypes.is_empty(url): # default the target tag to the #valid_tag target_tag = row.get('#valid_value+target_tag', default=tag_pattern) try: # read the values from an external dataset source = hxl.data(url) allowed_values = source.get_value_set(row.get('#valid_value+target_tag')) if len(allowed_values) > 0: rule.tests.append(EnumerationTest(allowed_values, case_sensitive)) except BaseException as error: # don't add the test to the rule # do add an error about loading the values rule.external_errors.append(HXLValidationException( 'Error loading allowed values from {}: {}'.format(url, error.args[0]), scope='dataset', rule=rule, is_external=True )) schema.rules.append(rule) return schema
 Instance variables- var callback
- 
Callback function to receive error reports 
- var rules
- 
Rules making up this schema 
 Methods- def end(self)
- 
Terminate the validation run Expand source codedef end(self): """Terminate the validation run""" status = True for rule in self.rules: if not rule.end(): status = False return status
- def end_scan(self)
- 
End pre-scan, for rules that require it. Expand source codedef end_scan(self): """End pre-scan, for rules that require it.""" for rule in self.rules: if rule.needs_scan(): rule.end_scan()
- def scan_row(self, row)
- 
Pre-scan a row, only for rules that require it. Expand source codedef scan_row(self, row): """Pre-scan a row, only for rules that require it.""" for rule in self.rules: if rule.needs_scan(): rule.scan_row(row)
- def start(self)
- 
Initialise the validation run Expand source codedef start(self): """Initialise the validation run""" def rule_callback(e): """Relay rule callbacks""" if self.callback: self.callback(e) for rule in self.rules: rule.callback = rule_callback rule.start()
- def validate(self, source)
- 
Execute the main validation workflow. @param source: the hxl.model.Dataset to validate Expand source codedef validate(self, source): """Execute the main validation workflow. @param source: the hxl.model.Dataset to validate """ status = True # all is well at the beginning needs_scan = False # assume we don't need a pre-scan # do we need a cached, in-memory dataset? for rule in self.rules: if rule.needs_scan(): needs_scan = True if not source.is_cached: source = source.cache() break # initial setup self.start() # pre-scan if needed if needs_scan: for row in source: self.scan_row(row) self.end_scan() # dataset-level validations if not self.validate_dataset(source): status = False # row-level validations # (will also include cell-level validations) for row in source: if not self.validate_row(row): status = False # finalisation if not self.end(): status = False return status
- def validate_dataset(self, dataset)
- 
Validate just at the dataset level @param dataset: the hxl.model.Dataset object to validate Expand source codedef validate_dataset(self, dataset): """Validate just at the dataset level @param dataset: the hxl.model.Dataset object to validate """ status = True for rule in self.rules: if not rule.validate_dataset(dataset): status = False return status
- def validate_row(self, row)
- 
Validate at the row and cell levels. Each rule will handle cell-level validation on its own, because it knows what columns to look at. @param row: the row to validate Expand source codedef validate_row(self, row): """Validate at the row and cell levels. Each rule will handle cell-level validation on its own, because it knows what columns to look at. @param row: the row to validate """ status = True for rule in self.rules: if not rule.validate_row(row): status = False return status
 
- class SchemaRule (tag_pattern, severity='error', description=None, callback=None)
- 
A single rule within a schema. A rule contains one or more tests, together with some common metadata (a tag pattern, severity level, and description). If any test fails, then the whole rule fails. Workflow (triggered by Schema.validate: - needs_scan()
- start()
- scan_row() for each row (if needs_scan() returned True)
- scan_cell() for each maching non-empty cell (if needs_scan() returned True)
- end_scan() (if needs_scan() returned True)
- validate_dataset()
- validate_row() for each row
- validate_cell() for each matching non-empty cell in each row
- end()
 Constructor @param tag_pattern: the tag pattern to match for the rule @param severity: one of 'info', 'warning', or 'error' (default) @param description: an optional error message to override the default from the tests @param callback: an optional callback function to handle error reports Expand source codeclass SchemaRule(object): """A single rule within a schema. A rule contains one or more tests, together with some common metadata (a tag pattern, severity level, and description). If any test fails, then the whole rule fails. Workflow (triggered by Schema.validate: - needs_scan() - start() - scan_row() for each row (if needs_scan() returned True) - scan_cell() for each maching non-empty cell (if needs_scan() returned True) - end_scan() (if needs_scan() returned True) - validate_dataset() - validate_row() for each row - validate_cell() for each matching non-empty cell in each row - end() """ SEVERITY = ('info', 'warning', 'error',) def __init__(self, tag_pattern, severity="error", description=None, callback=None): """Constructor @param tag_pattern: the tag pattern to match for the rule @param severity: one of 'info', 'warning', or 'error' (default) @param description: an optional error message to override the default from the tests @param callback: an optional callback function to handle error reports """ self.tag_pattern = hxl.TagPattern.parse(tag_pattern) self.description = description self.callback = callback # make sure the severity level is valid severity = hxl.datatypes.normalise_string(severity) if severity in SchemaRule.SEVERITY: self.severity = severity else: raise hxl.HXLException("Unsupported rule severity level: {}".format(severity)) # Additional internal variables self.tests = [] """List of AbstractRuleTest objects to apply as part of this rule""" self.external_errors = [] """Errors external to the dataset itself (e.g. missing taxonomies)""" self.saved_indices = None """List of saved column indices matching tag_pattern""" def needs_scan(self): """Check if any test in this rule requires a pre-scan of the dataset. @returns: True if at least one test's needs_scan() method returns True """ for test in self.tests: if test.needs_scan(): return True return False def start(self): """Initialisation method Call after all values have been set, but before use """ self.saved_indices = None def test_callback(e): """Relay error reports from tests, with rule-level context added""" e.rule = self if self.callback: self.callback(e) # (re)initialise all the tests for test in self.tests: test.callback = test_callback # call back to here test.start() def end(self): """Call at end of parse to get post-parse errors""" status = True # finish all the tests for test in self.tests: if not test.end(): status = False self.saved_indices = None return status def scan_row(self, row): """Pre-scan a row and its individual cells. This method does not report errors or return a status. Will be invoked only if needs_scan() returned True Calls both scan_row() and scan_cell() for each test. @param row the Row to scan """ if self.saved_indices is None: self.saved_indices = hxl.model.get_column_indices([self.tag_pattern], row.columns) # scan each row, then each matching cell in the row for test in self.tests: if not test.needs_scan(): # don't invoke unless the test asked for it continue test.scan_row(row, self.saved_indices) for i in self.saved_indices: # validate individual cells if i < len(row.values) and not hxl.datatypes.is_empty(row.values[i]): test.scan_cell(row.values[i], row, row.columns[i]) def end_scan(self): """Invoke end_scan() for all tests that need it""" for test in self.tests: if test.needs_scan(): test.end_scan() def validate_dataset(self, dataset, indices=None, tag_pattern=None): """Test whether the columns are present to satisfy this rule.""" status = True if self.saved_indices is None: self.saved_indices = hxl.model.get_column_indices([self.tag_pattern], dataset.columns) # Report any external errors for error in self.external_errors: if self.callback: self.callback(error) # run each of the tests for test in self.tests: if not test.validate_dataset(dataset, indices=self.saved_indices): status = False return status def validate_row(self, row): """ Apply the rule to an entire Row @param row the Row to validate @return True if all matching values in the row are valid """ # individual rules may change to False status = True if self.saved_indices is None: self.saved_indices = hxl.model.get_column_indices([self.tag_pattern], row.columns) # run each test on the complete row, then on individual cells for test in self.tests: if not test.validate_row(row, self.saved_indices): status = False for i in self.saved_indices: # validate individual cells if i < len(row.values) and not hxl.datatypes.is_empty(row.values[i]): if not test.validate_cell(row.values[i], row, row.columns[i]): status = False return status def __str__(self): """String representation of a rule (for debugging)""" return "<HXL schema rule: " + str(self.tag_pattern) + ">"Class variables- var SEVERITY
 Instance variables- var external_errors
- 
Errors external to the dataset itself (e.g. missing taxonomies) 
- var saved_indices
- 
List of saved column indices matching tag_pattern 
- var tests
- 
List of AbstractRuleTest objects to apply as part of this rule 
 Methods- def end(self)
- 
Call at end of parse to get post-parse errors Expand source codedef end(self): """Call at end of parse to get post-parse errors""" status = True # finish all the tests for test in self.tests: if not test.end(): status = False self.saved_indices = None return status
- def end_scan(self)
- 
Invoke end_scan() for all tests that need it Expand source codedef end_scan(self): """Invoke end_scan() for all tests that need it""" for test in self.tests: if test.needs_scan(): test.end_scan()
- def needs_scan(self)
- 
Check if any test in this rule requires a pre-scan of the dataset. @returns: True if at least one test's needs_scan() method returns True Expand source codedef needs_scan(self): """Check if any test in this rule requires a pre-scan of the dataset. @returns: True if at least one test's needs_scan() method returns True """ for test in self.tests: if test.needs_scan(): return True return False
- def scan_row(self, row)
- 
Pre-scan a row and its individual cells. This method does not report errors or return a status. Will be invoked only if needs_scan() returned True Calls both scan_row() and scan_cell() for each test. @param row the Row to scan Expand source codedef scan_row(self, row): """Pre-scan a row and its individual cells. This method does not report errors or return a status. Will be invoked only if needs_scan() returned True Calls both scan_row() and scan_cell() for each test. @param row the Row to scan """ if self.saved_indices is None: self.saved_indices = hxl.model.get_column_indices([self.tag_pattern], row.columns) # scan each row, then each matching cell in the row for test in self.tests: if not test.needs_scan(): # don't invoke unless the test asked for it continue test.scan_row(row, self.saved_indices) for i in self.saved_indices: # validate individual cells if i < len(row.values) and not hxl.datatypes.is_empty(row.values[i]): test.scan_cell(row.values[i], row, row.columns[i])
- def start(self)
- 
Initialisation method Call after all values have been set, but before use Expand source codedef start(self): """Initialisation method Call after all values have been set, but before use """ self.saved_indices = None def test_callback(e): """Relay error reports from tests, with rule-level context added""" e.rule = self if self.callback: self.callback(e) # (re)initialise all the tests for test in self.tests: test.callback = test_callback # call back to here test.start()
- def validate_dataset(self, dataset, indices=None, tag_pattern=None)
- 
Test whether the columns are present to satisfy this rule. Expand source codedef validate_dataset(self, dataset, indices=None, tag_pattern=None): """Test whether the columns are present to satisfy this rule.""" status = True if self.saved_indices is None: self.saved_indices = hxl.model.get_column_indices([self.tag_pattern], dataset.columns) # Report any external errors for error in self.external_errors: if self.callback: self.callback(error) # run each of the tests for test in self.tests: if not test.validate_dataset(dataset, indices=self.saved_indices): status = False return status
- def validate_row(self, row)
- 
Apply the rule to an entire Row @param row the Row to validate @return True if all matching values in the row are valid Expand source codedef validate_row(self, row): """ Apply the rule to an entire Row @param row the Row to validate @return True if all matching values in the row are valid """ # individual rules may change to False status = True if self.saved_indices is None: self.saved_indices = hxl.model.get_column_indices([self.tag_pattern], row.columns) # run each test on the complete row, then on individual cells for test in self.tests: if not test.validate_row(row, self.saved_indices): status = False for i in self.saved_indices: # validate individual cells if i < len(row.values) and not hxl.datatypes.is_empty(row.values[i]): if not test.validate_cell(row.values[i], row, row.columns[i]): status = False return status
 
- class SpellingTest (case_sensitive=False)
- 
Detect spelling outliers in a column HXL schema: #valid_value+spelling Will treat numbers and dates as strings, so use this only in columns where you expect text, and frequently-repeated values (e.g. #status, #org+name, #sector+name). Will skip validation if the coefficient of variation > 1.0 Collects all of the spelling variants first, then checks the rare ones in the end() method, and reports any ones that have near matches among the common ones. Constructor @param case_sensitive: if True, differences in case are considered errors (default False) Expand source codeclass SpellingTest(AbstractRuleTest): """Detect spelling outliers in a column HXL schema: #valid_value+spelling Will treat numbers and dates as strings, so use this only in columns where you expect text, and frequently-repeated values (e.g. #status, #org+name, #sector+name). Will skip validation if the coefficient of variation > 1.0 Collects all of the spelling variants first, then checks the rare ones in the end() method, and reports any ones that have near matches among the common ones. """ ERROR_CUTOFF=0.05 # 5% """Cutoff for reporting a possible error, as percentage of mean frequency for each spelling""" def __init__(self, case_sensitive=False): """Constructor @param case_sensitive: if True, differences in case are considered errors (default False) """ super().__init__() self.case_sensitive = case_sensitive def start(self): """Set up for a validation run""" # spelling -> locations self.spelling_map = dict() """Store spellings and locations by tagspec""" self.total_occurrences = dict() """Count the total spelling occurrences, for mean and standard deviation, by tagspec""" def end(self): """Report possible spelling errors. Collect all spellings that appear less than 1/3 of the mean frequency. For each of these, check whether there's a close match among the more-common spellings, and if so, then report (once for each location) with the suggested correction. """ # start by assuming all is well status = True for tagspec, spellings in self.spelling_map.items(): # cache corrections so that we don't keep looking up the same ones correction_cache = {} # if there aren't any spellings, then we're done if len(spellings) == 0: break # get the average (mean) occurrences for each spelling mean_frequency = self.total_occurrences[tagspec] / len(spellings) # calculate the coefficiant of variance (dimensionless) standard_deviation = math.sqrt( sum(map(lambda n: (len(n)-mean_frequency)**2, spellings.values())) / len(spellings) ) variance_coefficient = standard_deviation / mean_frequency # there's no point spelling checking unless the variance coefficient is low enough to be meaningful if variance_coefficient > 1.0: break # first pass: collect and clear good spellings good_spellings = list() for spelling, locations in spellings.items(): if len(locations) > mean_frequency * SpellingTest.ERROR_CUTOFF: good_spellings.append(spelling) spellings[spelling] = None # no potential errors # second pass: report any remaining dubious spellings that have close matches among good spellings for spelling, locations in spellings.items(): if locations is None: # this spelling was OK continue # is there a near match among good spellings? if spelling in correction_cache: correction = correction_cache['spelling'] else: correction = find_closest_match(spelling, good_spellings) correction_cache[spelling] = correction if correction is not None: # if it's rare and there's a near match, report an error status = False for location in locations: self.report_error( 'Possible spelling error', value=location[2], row=location[0], column=location[1], suggested_value=correction, scope='cell' ) return status # false if we've found a possible correction def validate_cell(self, value, row, column): """Record all the spellings found, for later sorting""" tagspec = column.get_display_tag(sort_attributes=True) # FIXME if self.case_sensitive: cooked_value = hxl.datatypes.normalise_space(value) else: cooked_value = hxl.datatypes.normalise_string(value) self.total_occurrences[tagspec] = self.total_occurrences.setdefault(tagspec, 0) + 1 self.spelling_map.setdefault(tagspec, {}).setdefault(cooked_value, []).append((row, column, value,)) return TrueAncestorsClass variables- var ERROR_CUTOFF
- 
Cutoff for reporting a possible error, as percentage of mean frequency for each spelling 
 Methods- def end(self)
- 
Report possible spelling errors. Collect all spellings that appear less than 1/3 of the mean frequency. For each of these, check whether there's a close match among the more-common spellings, and if so, then report (once for each location) with the suggested correction. Expand source codedef end(self): """Report possible spelling errors. Collect all spellings that appear less than 1/3 of the mean frequency. For each of these, check whether there's a close match among the more-common spellings, and if so, then report (once for each location) with the suggested correction. """ # start by assuming all is well status = True for tagspec, spellings in self.spelling_map.items(): # cache corrections so that we don't keep looking up the same ones correction_cache = {} # if there aren't any spellings, then we're done if len(spellings) == 0: break # get the average (mean) occurrences for each spelling mean_frequency = self.total_occurrences[tagspec] / len(spellings) # calculate the coefficiant of variance (dimensionless) standard_deviation = math.sqrt( sum(map(lambda n: (len(n)-mean_frequency)**2, spellings.values())) / len(spellings) ) variance_coefficient = standard_deviation / mean_frequency # there's no point spelling checking unless the variance coefficient is low enough to be meaningful if variance_coefficient > 1.0: break # first pass: collect and clear good spellings good_spellings = list() for spelling, locations in spellings.items(): if len(locations) > mean_frequency * SpellingTest.ERROR_CUTOFF: good_spellings.append(spelling) spellings[spelling] = None # no potential errors # second pass: report any remaining dubious spellings that have close matches among good spellings for spelling, locations in spellings.items(): if locations is None: # this spelling was OK continue # is there a near match among good spellings? if spelling in correction_cache: correction = correction_cache['spelling'] else: correction = find_closest_match(spelling, good_spellings) correction_cache[spelling] = correction if correction is not None: # if it's rare and there's a near match, report an error status = False for location in locations: self.report_error( 'Possible spelling error', value=location[2], row=location[0], column=location[1], suggested_value=correction, scope='cell' ) return status # false if we've found a possible correction
- def start(self)
- 
Set up for a validation run Expand source codedef start(self): """Set up for a validation run""" # spelling -> locations self.spelling_map = dict() """Store spellings and locations by tagspec""" self.total_occurrences = dict() """Count the total spelling occurrences, for mean and standard deviation, by tagspec"""
- def validate_cell(self, value, row, column)
- 
Record all the spellings found, for later sorting Expand source codedef validate_cell(self, value, row, column): """Record all the spellings found, for later sorting""" tagspec = column.get_display_tag(sort_attributes=True) # FIXME if self.case_sensitive: cooked_value = hxl.datatypes.normalise_space(value) else: cooked_value = hxl.datatypes.normalise_string(value) self.total_occurrences[tagspec] = self.total_occurrences.setdefault(tagspec, 0) + 1 self.spelling_map.setdefault(tagspec, {}).setdefault(cooked_value, []).append((row, column, value,)) return True
 Inherited members
- class UniqueRowTest (tag_patterns=None)
- 
Test for duplicate rows, optionally using a list of tag patterns as a key HXL schema: #valid_unique+key If there are no tag patterns provided, uses the entire row to make the key Note that the target tag pattern (#valid_tag) is irrelevant for this test. Constructor If no tag patterns are supplied, test the whole row. @param tag_patterns: list of tag patterns to test Expand source codeclass UniqueRowTest(AbstractRuleTest): """Test for duplicate rows, optionally using a list of tag patterns as a key HXL schema: #valid_unique+key If there are no tag patterns provided, uses the entire row to make the key Note that the target tag pattern (#valid_tag) is irrelevant for this test. """ def __init__(self, tag_patterns=None): """Constructor If no tag patterns are supplied, test the whole row. @param tag_patterns: list of tag patterns to test """ super().__init__() if tag_patterns is not None: self.tag_patterns = hxl.model.TagPattern.parse_list(tag_patterns) else: self.tag_patterns = None def start(self): self.keys_seen = set() # create the empty key set def end(self): self.keys_seen = None # free some memory return True def validate_row(self, row, indices=[], tag_pattern=None): key = row.key(self.tag_patterns) if key in self.keys_seen: return self.report_error( 'Duplicate row according to key values {}'.format(str(key)), row=row, scope='row' ) else: self.keys_seen.add(key) return TrueAncestorsInherited members
- class UniqueValueTest (callback=None)
- 
Test that individual values are unique HXL schema: #valid_unique-key Every value in any column matching tag_pattern must be unique. Normalises case and whitespace before testing, so "Aaa" and " aaa" would count as duplicates. Set up a schema test. @param callback: a callback function to receive error reports Expand source codeclass UniqueValueTest(AbstractRuleTest): """Test that individual values are unique HXL schema: #valid_unique-key Every value in any column matching tag_pattern must be unique. Normalises case and whitespace before testing, so "Aaa" and " aaa" would count as duplicates. """ def start(self): self.values_seen = set() # create the empty value set def end(self): self.values_seen = None # free some memory return True def validate_cell(self, value, row, column): """Report an error if we see the same (normalised) value more than once""" norm_value = hxl.datatypes.normalise_string(value) if norm_value in self.values_seen: return self.report_error( "Duplicate value", value=value, row=row, column=column ) else: self.values_seen.add(norm_value) return TrueAncestorsMethods- def validate_cell(self, value, row, column)
- 
Report an error if we see the same (normalised) value more than once Expand source codedef validate_cell(self, value, row, column): """Report an error if we see the same (normalised) value more than once""" norm_value = hxl.datatypes.normalise_string(value) if norm_value in self.values_seen: return self.report_error( "Duplicate value", value=value, row=row, column=column ) else: self.values_seen.add(norm_value) return True
 Inherited members
- class WhitespaceTest (callback=None)
- 
Test for irregular whitespace in a cell HXL schema: #valid_value+whitespace Irregular whitespace is any leading or trailing space, or anything but a single space character inside a string Set up a schema test. @param callback: a callback function to receive error reports Expand source codeclass WhitespaceTest(AbstractRuleTest): """Test for irregular whitespace in a cell HXL schema: #valid_value+whitespace Irregular whitespace is any leading or trailing space, or anything but a single space character inside a string """ PATTERN = r'^(\s+.*|.*(\s\s|[\t\r\n]).*|.*\s+)$' """Regular expression to detect irregular whitespace""" def validate_cell(self, value, row, column): """Is there irregular whitespace?""" if hxl.datatypes.is_string(value): if re.match(WhitespaceTest.PATTERN, value): return self.report_error( 'Found extra whitespace', value=value, row=row, column=column, suggested_value=hxl.datatypes.normalise_space(value) ) return TrueAncestorsClass variables- var PATTERN
- 
Regular expression to detect irregular whitespace 
 Methods- def validate_cell(self, value, row, column)
- 
Is there irregular whitespace? Expand source codedef validate_cell(self, value, row, column): """Is there irregular whitespace?""" if hxl.datatypes.is_string(value): if re.match(WhitespaceTest.PATTERN, value): return self.report_error( 'Found extra whitespace', value=value, row=row, column=column, suggested_value=hxl.datatypes.normalise_space(value) ) return True
 Inherited members