FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ProcessorSurveyUtils.py 5.33 KiB
#ProcessorSurveyUtils.py
"""Functions used to process multiple input formats."""

import datetime
import logging
import re

from pandas import Series, DataFrame, concat

from ProcessorUtils import (
        add_filters_to_sublogger,
)

logger = logging.getLogger('Processor.Surveys.Utils')
add_filters_to_sublogger(logger)

#parse columns into ODK format
def parse_location_str(location_str):

    # expecting a space-separated string containing four numbers which
    # contain a decimal point
    regex = r'(?P<lat>[-?0-9\.]+)\s(?P<lon>[-?0-9\.]+)\s(?P<alt>[0-9\.]+)\s(?P<acc>[0-9\.]+)'

    # needed because the ODK names are too complicated for regex named groups
    name_dict = {
        'lat' : 'survey_infromation-location-Latitude',
        'lon' : 'survey_infromation-location-Longitude',
        'alt' : 'survey_infromation-location-Altitude',
        'acc' : 'survey_infromation-location-Accuracy'
        }

    res = re.search(regex,location_str)

    loc_series = Series(res.groupdict())

    loc_series.rename(index=name_dict,inplace=True)

    return loc_series

def convert_date(date_str,fmt_in,fmt_out):

    # in case any nan's creep in
    if str(date_str)=='nan':
        return 'nan'

    # timezones in kobotoolbox data are irregular
    # datetime needs +HHMM
    # so setting up a regex to check for these cases and handle
    pattern1 = '\+[0-9][0-9]$'
    if re.search(pattern1,date_str):
        # need to provide empty MM
        date_str = date_str + '00'
    pattern2 = '\+([0-9][0-9]):([0-9][0-9])$'
    if re.search(pattern2,date_str):
        # need to provide empty MM
        date_str = re.sub(pattern2,'+\g<1>\g<2>',date_str)

    date_in = datetime.datetime.strptime(date_str,fmt_in)
    date_str_out = date_in.strftime(fmt_out)

    return date_str_out

def parse_location_kobotoolbox(series):

    loc_df = series.apply(parse_location_str)

    return loc_df

def parse_date(series,name_out='date',fmt_in = '%Y-%m-%d',fmt_out= '%b %d, %Y'):

    s_out = series.apply(convert_date,fmt_in=fmt_in,fmt_out=fmt_out)

    s_out.rename(name_out,inplace=True)

    return s_out

def parse_cases(series, name_out, cases, dtype = None, fillna = None):
    
    if dtype is None:
        dtype = series.dtype

    # Handle nans explicitly
    if fillna is not None:
        series.fillna(fillna,inplace=True)

    # Converting entries according to a cases dictionary
    # nans cannot be handled by map
    series_out = series.map(cases).astype(dtype)
    
    # Renaming series
    series_out.rename(name_out,inplace=True)

    return series_out

# dict of functions callable within coln_parser_dict
# so they can be obtained with a string in coln_parser_dict
func_dict = {
    'parse_date' : parse_date,
    'parse_location_kobotoolbox' : parse_location_kobotoolbox,
    'parse_cases' : parse_cases
}

def parse_columns(df_in,coln_parser_dict):
    '''Converts datasets according to coln_parser_dict. This is used by survey 
    processors to achieve a conventional format. Works on each type of 
    conversion in turn.

    coln_parse_dict is the configuration used to convert columns:
    - keys are column names in the input dataframe
    - values that are 'None' mean they should be dropped
    - values that are string simply rename the column
    - values that are tuples should be a runnable function with kwargs, where
    the first item is the string identifier of the functionre and the rest is a
    list of key,value pairs to be provided as kwargs, returns series/dataframe,
    and drops key column.
    # TODO: is it neccesary to provide dtype conversion somewhere (e.g. dates)?'''

    df_out = df_in.copy()

    # drop any indicated columns
    coln_drop_list = [k for k,v in coln_parser_dict.items() if v == 'None']
    logger.info(f"Dropping {len(coln_drop_list)} columns")
    logger.debug(f"Columns being dropped are {coln_drop_list}")
    for key in coln_drop_list:
        del df_out[key]

    # rename any indicated columns
    coln_rename_dict = {k:v for k,v in coln_parser_dict.items() if isinstance(v,str)}
    logger.info(f"Renaming {len(coln_rename_dict)} columns")
    logger.debug(f"Columns being renamed are {coln_rename_dict}")
    df_out.rename(columns=coln_rename_dict,inplace=True)

    # apply any functions
    # callable only works in python 3.2+ apparently
    coln_func_dict = {k:v for k,v in coln_parser_dict.items() if isinstance(v,tuple)}
    logger.info(f"Applying {len(coln_func_dict)} functions to columns")
    logger.debug(f"Columns being renamed are {coln_rename_dict}")
    dfs_to_concat = [df_out]

    for key,val in coln_func_dict.items():

        # TODO: there is a more pythonic way to get functions with a string
        func = func_dict[val[0]]
        assert callable(func)
        kwargs = {k:v for k,v in val[1]}
        columns_out = func(df_in[key],**kwargs)

        if isinstance(columns_out,DataFrame):
            num_outputs = columns_out.shape[-1]
            column_names = columns_out.columns

        elif isinstance(columns_out,Series):
            num_outputs = 1
            column_names = [columns_out.name]

        logger.info(f"Adding {num_outputs} columns to dataframe")
        logger.debug(f"New columns are {column_names}")

        dfs_to_concat += [columns_out]

        # drop the original column, now that it has been parsed with func
        del df_out[key]

    df_final = concat(dfs_to_concat,axis='columns')

    return df_final