Something went wrong on our end
-
J.W. Smith authoredJ.W. Smith authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ProcessorSurveyUtils.py 5.33 KiB
#ProcessorSurveyUtils.py
"""Functions used to process multiple input formats."""
import datetime
import logging
import re
from pandas import Series, DataFrame, concat
from ProcessorUtils import (
add_filters_to_sublogger,
)
logger = logging.getLogger('Processor.Surveys.Utils')
add_filters_to_sublogger(logger)
#parse columns into ODK format
def parse_location_str(location_str):
# expecting a space-separated string containing four numbers which
# contain a decimal point
regex = r'(?P<lat>[-?0-9\.]+)\s(?P<lon>[-?0-9\.]+)\s(?P<alt>[0-9\.]+)\s(?P<acc>[0-9\.]+)'
# needed because the ODK names are too complicated for regex named groups
name_dict = {
'lat' : 'survey_infromation-location-Latitude',
'lon' : 'survey_infromation-location-Longitude',
'alt' : 'survey_infromation-location-Altitude',
'acc' : 'survey_infromation-location-Accuracy'
}
res = re.search(regex,location_str)
loc_series = Series(res.groupdict())
loc_series.rename(index=name_dict,inplace=True)
return loc_series
def convert_date(date_str,fmt_in,fmt_out):
# in case any nan's creep in
if str(date_str)=='nan':
return 'nan'
# timezones in kobotoolbox data are irregular
# datetime needs +HHMM
# so setting up a regex to check for these cases and handle
pattern1 = '\+[0-9][0-9]$'
if re.search(pattern1,date_str):
# need to provide empty MM
date_str = date_str + '00'
pattern2 = '\+([0-9][0-9]):([0-9][0-9])$'
if re.search(pattern2,date_str):
# need to provide empty MM
date_str = re.sub(pattern2,'+\g<1>\g<2>',date_str)
date_in = datetime.datetime.strptime(date_str,fmt_in)
date_str_out = date_in.strftime(fmt_out)
return date_str_out
def parse_location_kobotoolbox(series):
loc_df = series.apply(parse_location_str)
return loc_df
def parse_date(series,name_out='date',fmt_in = '%Y-%m-%d',fmt_out= '%b %d, %Y'):
s_out = series.apply(convert_date,fmt_in=fmt_in,fmt_out=fmt_out)
s_out.rename(name_out,inplace=True)
return s_out
def parse_cases(series, name_out, cases, dtype = None, fillna = None):
if dtype is None:
dtype = series.dtype
# Handle nans explicitly
if fillna is not None:
series.fillna(fillna,inplace=True)
# Converting entries according to a cases dictionary
# nans cannot be handled by map
series_out = series.map(cases).astype(dtype)
# Renaming series
series_out.rename(name_out,inplace=True)
return series_out
# dict of functions callable within coln_parser_dict
# so they can be obtained with a string in coln_parser_dict
func_dict = {
'parse_date' : parse_date,
'parse_location_kobotoolbox' : parse_location_kobotoolbox,
'parse_cases' : parse_cases
}
def parse_columns(df_in,coln_parser_dict):
'''Converts datasets according to coln_parser_dict. This is used by survey
processors to achieve a conventional format. Works on each type of
conversion in turn.
coln_parse_dict is the configuration used to convert columns:
- keys are column names in the input dataframe
- values that are 'None' mean they should be dropped
- values that are string simply rename the column
- values that are tuples should be a runnable function with kwargs, where
the first item is the string identifier of the functionre and the rest is a
list of key,value pairs to be provided as kwargs, returns series/dataframe,
and drops key column.
# TODO: is it neccesary to provide dtype conversion somewhere (e.g. dates)?'''
df_out = df_in.copy()
# drop any indicated columns
coln_drop_list = [k for k,v in coln_parser_dict.items() if v == 'None']
logger.info(f"Dropping {len(coln_drop_list)} columns")
logger.debug(f"Columns being dropped are {coln_drop_list}")
for key in coln_drop_list:
del df_out[key]
# rename any indicated columns
coln_rename_dict = {k:v for k,v in coln_parser_dict.items() if isinstance(v,str)}
logger.info(f"Renaming {len(coln_rename_dict)} columns")
logger.debug(f"Columns being renamed are {coln_rename_dict}")
df_out.rename(columns=coln_rename_dict,inplace=True)
# apply any functions
# callable only works in python 3.2+ apparently
coln_func_dict = {k:v for k,v in coln_parser_dict.items() if isinstance(v,tuple)}
logger.info(f"Applying {len(coln_func_dict)} functions to columns")
logger.debug(f"Columns being renamed are {coln_rename_dict}")
dfs_to_concat = [df_out]
for key,val in coln_func_dict.items():
# TODO: there is a more pythonic way to get functions with a string
func = func_dict[val[0]]
assert callable(func)
kwargs = {k:v for k,v in val[1]}
columns_out = func(df_in[key],**kwargs)
if isinstance(columns_out,DataFrame):
num_outputs = columns_out.shape[-1]
column_names = columns_out.columns
elif isinstance(columns_out,Series):
num_outputs = 1
column_names = [columns_out.name]
logger.info(f"Adding {num_outputs} columns to dataframe")
logger.debug(f"New columns are {column_names}")
dfs_to_concat += [columns_out]
# drop the original column, now that it has been parsed with func
del df_out[key]
df_final = concat(dfs_to_concat,axis='columns')
return df_final