Module hxl.formulas.functions

Functions that can run inside a formula

Expand source code
"""Functions that can run inside a formula
"""

import logging, collections
import hxl.datatypes
import datetime

from hxl.util import logup

logger = logging.getLogger(__name__)

#
# Operators (not directly callable as functions, but see below)
#

def const(row, args, multiple=False):
    """A constant value (returns itself).
    """
    return args[0]

def tagref(row, args):
    """A single tag pattern standing alone.
    @param row: the HXL data row
    @param args: the arguments parsed
    """
    return row.get(args[0])

def add(row, args, multiple=False):
    """An addition statement
    X + Y
    @param row: the HXL data row
    @param args: the arguments parsed
    @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form)
    @returns: the sum of the arguments
    """
    result = 0
    for arg in _deref(row, args, multiple):
        result += _num(arg)
    return result

def subtract(row, args, multiple=False):
    """A subtraction statement
    X - Y
    @param row: the HXL data row
    @param args: the arguments parsed
    @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form)
    @returns: the result of subtracting all of the following arguments from the first one
    """
    args = _deref(row, args, multiple)
    result = _num(args[0]) if len(args) > 0 else 0
    for arg in args[1:]:
        result -= _num(arg)
    return result

def multiply(row, args, multiple=False):
    """A multiplication statement
    X * Y
    @param row: the HXL data row
    @param args: the arguments parsed
    @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form)
    @returns: the product of the arguments
    """
    args = _deref(row, args, multiple)
    result = _num(args[0]) if len(args) > 0 else 0
    for arg in args[1:]:
        result *= _num(arg)
    return result

def divide(row, args, multiple=False):
    """A division statement
    X / Y
    @param row: the HXL data row
    @param args: the arguments parsed
    @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form)
    @returns: the result of dividing the first argument by all of the following ones, in order.
    """
    args = _deref(row, args, multiple)
    result = _num(args[0]) if len(args) > 0 else 0
    for arg in args[1:]:
        v = _num(arg) # avoid DIV0
        if v:
            result = result / v
        else:
            return 'NaN'
    return result

def modulo(row, args, multiple=False):
    """A modulo division statement
    X / Y
    @param row: the HXL data row
    @param args: the arguments parsed
    @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form)
    @returns: the remainder from dividing the first argument by all of the following ones, in order.
    """
    args = _deref(row, args, multiple)
    result = _num(args[0]) if len(args) > 0 else 0
    for arg in args[1:]:
        v = _num(arg) # avoid DIV0
        if v:
            result = result % v
    return result


#
# User-callable functions
#

def function(row, args):
    """Execute a named function
    function(arg, arg...)
    @param row: the HXL data row
    @param args: the arguments parsed (the first one is the function name)
    @returns: the result of executing the function on the arguments
    """
    f = FUNCTIONS.get(args[0])
    if f:
        return f(row, args[1:], True)
    else:
        logup('Unknown function', {"function": args[0]}, level='error')
        logger.error("Unknown function %s", args[0])
        return ''

def do_min(row, args, multiple=True):
    """Find the minimum value in the list.
    If they're all numbers (or empty), use numeric comparison.
    Otherwise, use lexical comparison (case- and space-insensitive)
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the minimum value
    """

    values = _deref(row, args, multiple)

    # first, try a numbery comparison
    try:
        min_value = None
        for value in values:
            if not hxl.datatypes.is_empty(value):
                value = hxl.datatypes.normalise_number(value)
                if min_value is None or min_value > value:
                    min_value = value
        return min_value
    # if that fails, revert to lexical
    except:
        min_value = None
        min_value_norm = None
        for value in values:
            if not hxl.datatypes.is_empty(value):
                norm = hxl.datatypes.normalise_string(value)
                if min_value_norm is None or norm < min_value_norm:
                    min_value_norm = norm
                    min_value = value
        return min_value

def do_max(row, args, multiple=True):
    """Find the maximum value in the list.
    If they're all numbers (or empty), use numeric comparison.
    Otherwise, use lexical comparison (case- and space-insensitive)
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the maximum value
    """

    values = _deref(row, args, multiple)

    # first, try a numbery comparison
    try:
        max_value = None
        for value in values:
            if not hxl.datatypes.is_empty(value):
                value = hxl.datatypes.normalise_number(value)
                if max_value is None or max_value < value:
                    max_value = value
        return max_value
    # if that fails, revert to lexical
    except:
        max_value = None
        max_value_norm = None
        for value in values:
            if not hxl.datatypes.is_empty(value):
                norm = hxl.datatypes.normalise_string(value)
                if max_value_norm is None or norm > max_value_norm:
                    max_value_norm = norm
                    max_value = value
        return max_value

def do_average(row, args, multiple=True):
    """Calculate the average (mean) of the arguments
    Ignores any cell that does not contain a number.
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the mean of all numeric arguments, or empty string if none found
    """
    values = _deref(row, args, multiple)

    total = 0
    count = 0

    # look for numbers
    for value in values:
        try:
            total += hxl.datatypes.normalise_number(value)
            count += 1
        except:
            pass # not a number

    # if there were no numbers, don't return a result
    if count > 0:
        return total / count
    else:
        return ''

def do_round(row, args, multiple=False):
    """Round a single value to the nearest integer.
    @param row: the HXL data row
    @param args: the function argument (name removed from start)
    @returns: the first argument, rounded if it's a number, or unchanged otherwise
    """
    values = _deref(row, args, False)
    if len(values) > 1:
        logup('Ignoring extra arguments to round()', {"args": str(values[1:])}, level='warning')
        logger.warning("Ignoring extra arguments to round(): %s", str(values[1:]))
    try:
        return round(values[0])
    except:
        logup('Trying to round non-numeric value', {"value": str(values[0])}, level='warning')
        logger.warning("Trying to round non-numeric value %s", values[0])
        return values[0]

def do_join(row, args, multiple=True):
    """Join values with the separator provided.
    Also joins empty values (for consistency)
    USAGE: join(sep, value1[, ...])
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: all of the arguments, joined together
    """
    values = _deref(row, args, multiple)
    separator = values[0]
    return separator.join(values[1:])


def do_today(row, args, multiple=False):
    """Return the current date (UTC) in ISO format YYYY-mm-dd
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the current UTC date in ISO YYYY-mm-dd format
    """
    return datetime.datetime.utcnow().strftime('%Y-%m-%d')


def do_datedif(row, args, multiple=False):
    """Calculate the difference between the first date and the second.
    The optional internal units arg determines the unit of measurement.
    USAGE: datedif(date1, date2[, unit])
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the difference between the dates as an integer.
    """
    values = _deref(row, args, multiple)
    if len(values) == 2:
        unit = 'D'
    elif len(values) == 3:
        unit = str(values[2]).upper()
    else:
        logup('Wrong number of arguments to datedif()', level='error')
        logger.error("Wrong number of arguments to datedif()")
        return ''
    try:
        date1 = datetime.datetime.strptime(hxl.datatypes.normalise_date(values[0]), '%Y-%m-%d')
    except:
        logup("Can't parse date", {"date": str(values[0])}, level='error')
        logger.error("Can't parse date: %s", values[0])
        return ''
    try:
        date2 = datetime.datetime.strptime(hxl.datatypes.normalise_date(values[1]), '%Y-%m-%d')
    except:
        logup("Can't parse date", {"date": str(values[1])}, level='error')
        logger.error("Can't parse date: %s", values[1])
        return ''
    diff = date2-date1
    if unit == 'Y':
        return int(abs(diff.days/365))
    elif unit == 'M':
        return abs(int(round(diff.days/30)))
    elif unit == 'W':
        return abs(int(round(diff.days/7)))
    elif unit == 'D':
        return abs(diff.days)
    else:
        logup('Unrecognised unit for datediff()', {"unit": str(unit)}, level='error')
        logger.error("Unrecognised unit %s for datediff()", str(unit))
        return ''


def do_toupper(row, args, multiple=False):
    """Convert the value to a string in upper case
    USAGE: toupper(value)
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the value as an upper-case string
    """
    values = _deref(row, args, multiple)
    return str(values[0]).upper()


def do_tolower(row, args, multiple=False):
    """Convert the value to a string in lower case
    USAGE: tolower(value)
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the value as an upper-case string
    """
    values = _deref(row, args, multiple)
    return str(values[0]).lower()


FUNCTIONS = {
    'sum': lambda row, args, multiple: add(row, args, multiple),
    'product': lambda row, args, multiple: multiply(row, args, multiple),
    'min': do_min,
    'max': do_max,
    'average': do_average,
    'round': do_round,
    'join': do_join,
    'today': do_today,
    'datedif': do_datedif,
    'toupper': do_toupper,
    'tolower': do_tolower,
}
"""Master table of user-callable functions"""


#
# Internal helpers
#

def _deref(row, args, multiple=False):
    """Dereference a term.
    If it's a two-element list with a function and a list, recurse.
    If it's a tag pattern, look it up in the row and replace with value(s)
    If it's already a literal (number or string), leave it alone.
    @param row: a hxl.model.Row object
    @param args: a list of arguments to dereference (may be tag patterns or literals)
    @param multiple: if true, return all matches for a tag pattern
    @return: always a list (may be empty)
    """
    result = []

    for arg in args:
        if isinstance(arg, collections.abc.Sequence) and callable(arg[0]):
            # it's a function and args: recurse
            if arg[0] == tagref:
                result += _deref(row, arg[1], multiple)
            else:
                result.append(arg[0](row, arg[1]))
        elif isinstance(arg, hxl.model.TagPattern):
            # it's a tag pattern: look up matching values in the row
            if multiple:
                result += row.get_all(arg)
            else:
                result.append(row.get(arg))
        else:
            # it's a literal: leave it alone
            result.append(arg)


    return result

def _num(arg):
    """Convert to a number if possible.
    Otherwise, return zero and log a warning.
    """
    if not arg:
        return 0
    try:
        return hxl.datatypes.normalise_number(arg)
    except (ValueError, TypeError):
        logup('Cannot convert to a number for calculated field', {"arg": arg}, level='info')
        logger.info("Cannot convert %s to a number for calculated field", arg)
        return 0

Global variables

var FUNCTIONS

Master table of user-callable functions

Functions

def add(row, args, multiple=False)

An addition statement X + Y @param row: the HXL data row @param args: the arguments parsed @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form) @returns: the sum of the arguments

Expand source code
def add(row, args, multiple=False):
    """An addition statement
    X + Y
    @param row: the HXL data row
    @param args: the arguments parsed
    @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form)
    @returns: the sum of the arguments
    """
    result = 0
    for arg in _deref(row, args, multiple):
        result += _num(arg)
    return result
def const(row, args, multiple=False)

A constant value (returns itself).

Expand source code
def const(row, args, multiple=False):
    """A constant value (returns itself).
    """
    return args[0]
def divide(row, args, multiple=False)

A division statement X / Y @param row: the HXL data row @param args: the arguments parsed @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form) @returns: the result of dividing the first argument by all of the following ones, in order.

Expand source code
def divide(row, args, multiple=False):
    """A division statement
    X / Y
    @param row: the HXL data row
    @param args: the arguments parsed
    @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form)
    @returns: the result of dividing the first argument by all of the following ones, in order.
    """
    args = _deref(row, args, multiple)
    result = _num(args[0]) if len(args) > 0 else 0
    for arg in args[1:]:
        v = _num(arg) # avoid DIV0
        if v:
            result = result / v
        else:
            return 'NaN'
    return result
def do_average(row, args, multiple=True)

Calculate the average (mean) of the arguments Ignores any cell that does not contain a number. @param row: the HXL data row @param args: the function arguments (name removed from start) @returns: the mean of all numeric arguments, or empty string if none found

Expand source code
def do_average(row, args, multiple=True):
    """Calculate the average (mean) of the arguments
    Ignores any cell that does not contain a number.
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the mean of all numeric arguments, or empty string if none found
    """
    values = _deref(row, args, multiple)

    total = 0
    count = 0

    # look for numbers
    for value in values:
        try:
            total += hxl.datatypes.normalise_number(value)
            count += 1
        except:
            pass # not a number

    # if there were no numbers, don't return a result
    if count > 0:
        return total / count
    else:
        return ''
def do_datedif(row, args, multiple=False)

Calculate the difference between the first date and the second. The optional internal units arg determines the unit of measurement. USAGE: datedif(date1, date2[, unit]) @param row: the HXL data row @param args: the function arguments (name removed from start) @returns: the difference between the dates as an integer.

Expand source code
def do_datedif(row, args, multiple=False):
    """Calculate the difference between the first date and the second.
    The optional internal units arg determines the unit of measurement.
    USAGE: datedif(date1, date2[, unit])
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the difference between the dates as an integer.
    """
    values = _deref(row, args, multiple)
    if len(values) == 2:
        unit = 'D'
    elif len(values) == 3:
        unit = str(values[2]).upper()
    else:
        logup('Wrong number of arguments to datedif()', level='error')
        logger.error("Wrong number of arguments to datedif()")
        return ''
    try:
        date1 = datetime.datetime.strptime(hxl.datatypes.normalise_date(values[0]), '%Y-%m-%d')
    except:
        logup("Can't parse date", {"date": str(values[0])}, level='error')
        logger.error("Can't parse date: %s", values[0])
        return ''
    try:
        date2 = datetime.datetime.strptime(hxl.datatypes.normalise_date(values[1]), '%Y-%m-%d')
    except:
        logup("Can't parse date", {"date": str(values[1])}, level='error')
        logger.error("Can't parse date: %s", values[1])
        return ''
    diff = date2-date1
    if unit == 'Y':
        return int(abs(diff.days/365))
    elif unit == 'M':
        return abs(int(round(diff.days/30)))
    elif unit == 'W':
        return abs(int(round(diff.days/7)))
    elif unit == 'D':
        return abs(diff.days)
    else:
        logup('Unrecognised unit for datediff()', {"unit": str(unit)}, level='error')
        logger.error("Unrecognised unit %s for datediff()", str(unit))
        return ''
def do_join(row, args, multiple=True)

Join values with the separator provided. Also joins empty values (for consistency) USAGE: join(sep, value1[, …]) @param row: the HXL data row @param args: the function arguments (name removed from start) @returns: all of the arguments, joined together

Expand source code
def do_join(row, args, multiple=True):
    """Join values with the separator provided.
    Also joins empty values (for consistency)
    USAGE: join(sep, value1[, ...])
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: all of the arguments, joined together
    """
    values = _deref(row, args, multiple)
    separator = values[0]
    return separator.join(values[1:])
def do_max(row, args, multiple=True)

Find the maximum value in the list. If they're all numbers (or empty), use numeric comparison. Otherwise, use lexical comparison (case- and space-insensitive) @param row: the HXL data row @param args: the function arguments (name removed from start) @returns: the maximum value

Expand source code
def do_max(row, args, multiple=True):
    """Find the maximum value in the list.
    If they're all numbers (or empty), use numeric comparison.
    Otherwise, use lexical comparison (case- and space-insensitive)
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the maximum value
    """

    values = _deref(row, args, multiple)

    # first, try a numbery comparison
    try:
        max_value = None
        for value in values:
            if not hxl.datatypes.is_empty(value):
                value = hxl.datatypes.normalise_number(value)
                if max_value is None or max_value < value:
                    max_value = value
        return max_value
    # if that fails, revert to lexical
    except:
        max_value = None
        max_value_norm = None
        for value in values:
            if not hxl.datatypes.is_empty(value):
                norm = hxl.datatypes.normalise_string(value)
                if max_value_norm is None or norm > max_value_norm:
                    max_value_norm = norm
                    max_value = value
        return max_value
def do_min(row, args, multiple=True)

Find the minimum value in the list. If they're all numbers (or empty), use numeric comparison. Otherwise, use lexical comparison (case- and space-insensitive) @param row: the HXL data row @param args: the function arguments (name removed from start) @returns: the minimum value

Expand source code
def do_min(row, args, multiple=True):
    """Find the minimum value in the list.
    If they're all numbers (or empty), use numeric comparison.
    Otherwise, use lexical comparison (case- and space-insensitive)
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the minimum value
    """

    values = _deref(row, args, multiple)

    # first, try a numbery comparison
    try:
        min_value = None
        for value in values:
            if not hxl.datatypes.is_empty(value):
                value = hxl.datatypes.normalise_number(value)
                if min_value is None or min_value > value:
                    min_value = value
        return min_value
    # if that fails, revert to lexical
    except:
        min_value = None
        min_value_norm = None
        for value in values:
            if not hxl.datatypes.is_empty(value):
                norm = hxl.datatypes.normalise_string(value)
                if min_value_norm is None or norm < min_value_norm:
                    min_value_norm = norm
                    min_value = value
        return min_value
def do_round(row, args, multiple=False)

Round a single value to the nearest integer. @param row: the HXL data row @param args: the function argument (name removed from start) @returns: the first argument, rounded if it's a number, or unchanged otherwise

Expand source code
def do_round(row, args, multiple=False):
    """Round a single value to the nearest integer.
    @param row: the HXL data row
    @param args: the function argument (name removed from start)
    @returns: the first argument, rounded if it's a number, or unchanged otherwise
    """
    values = _deref(row, args, False)
    if len(values) > 1:
        logup('Ignoring extra arguments to round()', {"args": str(values[1:])}, level='warning')
        logger.warning("Ignoring extra arguments to round(): %s", str(values[1:]))
    try:
        return round(values[0])
    except:
        logup('Trying to round non-numeric value', {"value": str(values[0])}, level='warning')
        logger.warning("Trying to round non-numeric value %s", values[0])
        return values[0]
def do_today(row, args, multiple=False)

Return the current date (UTC) in ISO format YYYY-mm-dd @param row: the HXL data row @param args: the function arguments (name removed from start) @returns: the current UTC date in ISO YYYY-mm-dd format

Expand source code
def do_today(row, args, multiple=False):
    """Return the current date (UTC) in ISO format YYYY-mm-dd
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the current UTC date in ISO YYYY-mm-dd format
    """
    return datetime.datetime.utcnow().strftime('%Y-%m-%d')
def do_tolower(row, args, multiple=False)

Convert the value to a string in lower case USAGE: tolower(value) @param row: the HXL data row @param args: the function arguments (name removed from start) @returns: the value as an upper-case string

Expand source code
def do_tolower(row, args, multiple=False):
    """Convert the value to a string in lower case
    USAGE: tolower(value)
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the value as an upper-case string
    """
    values = _deref(row, args, multiple)
    return str(values[0]).lower()
def do_toupper(row, args, multiple=False)

Convert the value to a string in upper case USAGE: toupper(value) @param row: the HXL data row @param args: the function arguments (name removed from start) @returns: the value as an upper-case string

Expand source code
def do_toupper(row, args, multiple=False):
    """Convert the value to a string in upper case
    USAGE: toupper(value)
    @param row: the HXL data row
    @param args: the function arguments (name removed from start)
    @returns: the value as an upper-case string
    """
    values = _deref(row, args, multiple)
    return str(values[0]).upper()
def function(row, args)

Execute a named function function(arg, arg…) @param row: the HXL data row @param args: the arguments parsed (the first one is the function name) @returns: the result of executing the function on the arguments

Expand source code
def function(row, args):
    """Execute a named function
    function(arg, arg...)
    @param row: the HXL data row
    @param args: the arguments parsed (the first one is the function name)
    @returns: the result of executing the function on the arguments
    """
    f = FUNCTIONS.get(args[0])
    if f:
        return f(row, args[1:], True)
    else:
        logup('Unknown function', {"function": args[0]}, level='error')
        logger.error("Unknown function %s", args[0])
        return ''
def modulo(row, args, multiple=False)

A modulo division statement X / Y @param row: the HXL data row @param args: the arguments parsed @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form) @returns: the remainder from dividing the first argument by all of the following ones, in order.

Expand source code
def modulo(row, args, multiple=False):
    """A modulo division statement
    X / Y
    @param row: the HXL data row
    @param args: the arguments parsed
    @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form)
    @returns: the remainder from dividing the first argument by all of the following ones, in order.
    """
    args = _deref(row, args, multiple)
    result = _num(args[0]) if len(args) > 0 else 0
    for arg in args[1:]:
        v = _num(arg) # avoid DIV0
        if v:
            result = result % v
    return result
def multiply(row, args, multiple=False)

A multiplication statement X * Y @param row: the HXL data row @param args: the arguments parsed @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form) @returns: the product of the arguments

Expand source code
def multiply(row, args, multiple=False):
    """A multiplication statement
    X * Y
    @param row: the HXL data row
    @param args: the arguments parsed
    @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form)
    @returns: the product of the arguments
    """
    args = _deref(row, args, multiple)
    result = _num(args[0]) if len(args) > 0 else 0
    for arg in args[1:]:
        result *= _num(arg)
    return result
def subtract(row, args, multiple=False)

A subtraction statement X - Y @param row: the HXL data row @param args: the arguments parsed @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form) @returns: the result of subtracting all of the following arguments from the first one

Expand source code
def subtract(row, args, multiple=False):
    """A subtraction statement
    X - Y
    @param row: the HXL data row
    @param args: the arguments parsed
    @param multiple: if true, allow tag patterns to expand to multiple values (used only for function form, not operator form)
    @returns: the result of subtracting all of the following arguments from the first one
    """
    args = _deref(row, args, multiple)
    result = _num(args[0]) if len(args) > 0 else 0
    for arg in args[1:]:
        result -= _num(arg)
    return result
def tagref(row, args)

A single tag pattern standing alone. @param row: the HXL data row @param args: the arguments parsed

Expand source code
def tagref(row, args):
    """A single tag pattern standing alone.
    @param row: the HXL data row
    @param args: the arguments parsed
    """
    return row.get(args[0])