diff --git a/coordinator/ProcessorSurveyUtils.py b/coordinator/ProcessorSurveyUtils.py index 100b43222fcd30b1a219646f5e25b90531f2e1ed..dc3c39f7cfae266e24d79e0668e2502ebe98e32d 100644 --- a/coordinator/ProcessorSurveyUtils.py +++ b/coordinator/ProcessorSurveyUtils.py @@ -110,8 +110,8 @@ def parse_columns(df_in,coln_parser_dict): - values that are 'None' mean they should be dropped - values that are string simply rename the column - values that are tuples should be a runnable function with kwargs, where - the first item is the string identifier of the functionre and the rest is a - list of key,value pairs to be provided as kwargs, returns series/dataframe, + the first item is the string identifier of the function and the rest is a + list of key, value pairs to be provided as kwargs, returns series/dataframe, and drops key column. # TODO: is it neccesary to provide dtype conversion somewhere (e.g. dates)?''' diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py index 0a70a841f3f1f1b5d733c7f16dbe5fc14bbbd376..9b6d9e08e85e531e6e87cc56a3e5413cd60255ad 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/ProcessorSurveys.py @@ -33,6 +33,7 @@ from Processor import Processor from source_gen.clustering import run_case from ProcessorSurveysODK import get_ODK_form_as_csv +from ProcessorSurveysODKSA import get_ODK_SA_form_as_csv from ProcessorSurveysODKv2 import get_ODKv2_form_as_csv from ProcessorSurveysnewODK import get_newODK_form_as_csv from ProcessorSurveysnewODK2 import get_newODK2_form_as_csv @@ -67,6 +68,7 @@ class ProcessorSurveys(Processor): self.GET_FORM_AS_CSV_DICT = { 'ODK': get_ODK_form_as_csv, + 'ODK_SA': get_ODK_SA_form_as_csv, 'kobotoolbox': get_kobotoolbox_form_as_csv, 'WRSIS': get_WRSIS_form_as_csv, 'WRT': get_WRT_form_as_csv, diff --git a/coordinator/ProcessorSurveysODKSA.py b/coordinator/ProcessorSurveysODKSA.py new file mode 100644 index 0000000000000000000000000000000000000000..4ba47848c721c8b858df75d8a35201033bdf6b6a --- /dev/null +++ b/coordinator/ProcessorSurveysODKSA.py @@ -0,0 +1,267 @@ +#ProcessorSurveysODK.py +"""Functions for parsing wheat rust survey records from ODK with South Asia +formatting.""" + +import datetime +import logging +import os +from csv import QUOTE_MINIMAL +from pathlib import Path +from string import Template +import subprocess + +from pandas import read_csv +from shutil import copyfile + +from ProcessorSurveyUtils import parse_columns +from ProcessorUtils import ( + subprocess_and_log, + endJob, + add_filters_to_sublogger, +) + +logger = logging.getLogger('Processor.Surveys.ODK') +add_filters_to_sublogger(logger) + +# keys are column names in the input dataframe +# values that are None mean they should be dropped +# values that are string simply rename the column +# values that are functions should be run with that key and returns series/dataframe +COLUMN_PARSER_DICT = { + 'SubmissionDate' : 'SubmissionDate', + 'start' : 'start', + 'end' : 'end', + 'today' : 'today', + 'deviceid' : 'deviceid', + 'subscriberid' : 'subscriberid', + 'imei' : 'imei', + 'phonenumber' : 'phonenumber', + 'username' : 'username', + 'country_list' : 'surveyor_infromation-country', + 'blast_rust' : 'None', + 'surveyor_name' : 'surveyor_infromation-surveyor_name', + 'institution' : 'surveyor_infromation-institution', + 'mobile_num' : 'None', + 'site_information-survey_site' : 'site_information-survey_site', + 'site_information-crop' : 'site_information-crop', + 'site_information-field_area' : 'site_information-field_area', # requires conversion later + 'site_information-unit_m2' : 'site_information-unit_m2', # requires conversion later + 'site_information-field_size' : 'site_information-field_size', # requires conversion later + 'site_information-variety' : 'site_information-variety', + 'site_information-growth_stage' : 'site_information-growth_stage', + 'other_crop' : 'None', + 'survey_infromation-location_name' : 'survey_infromation-location_name', + 'survey_infromation-location_blast' : 'survey_infromation-location_blast', + 'survey_infromation-sampColor' : 'survey_infromation-sampColor', + 'survey_infromation-dateRange' : 'survey_infromation-dateRange', + 'survey_infromation-fieldNumber' : 'survey_infromation-fieldNumber', + 'survey_infromation-diseaseIncidencePercentage' : 'survey_infromation-diseaseIncidencePercentage', + 'survey_infromation-severityPercentage' : 'survey_infromation-severityPercentage', + 'survey_infromation-survey_date' : 'survey_infromation-survey_date', + 'survey_infromation-site_name' : 'survey_infromation-location_name', + 'survey_infromation-location-Latitude' : 'survey_infromation-location-Latitude', + 'survey_infromation-location-Longitude' : 'survey_infromation-location-Longitude', + 'survey_infromation-location-Altitude' : 'survey_infromation-location-Altitude', + 'survey_infromation-location-Accuracy' : 'survey_infromation-location-Accuracy', + 'stem_rust-stemrust_incidence' : 'stem_rust-stemrust_incidence', + 'stem_rust-Stemrust_severity' : 'stem_rust-Stemrust_severity', + 'stem_rust-stemrust_host_plant_reaction' : 'stem_rust-stemrust_host_plant_reaction', + 'leaf_rust-leafrust_incidence' : 'leaf_rust-leafrust_incidence', + 'leaf_rust-leafrust_severity' : 'leaf_rust-leafrust_severity', + 'leaf_rust-leafrust_host_plant_reaction' : 'leaf_rust-leafrust_host_plant_reaction', + 'yellow_rust-yellowrust_incidence' : 'yellow_rust-yellowrust_incidence', + 'yellow_rust-yellowrust_severity' : 'yellow_rust-yellowrust_severity', + 'yellow_rust-yellowrust_host_plant_reaction' : 'yellow_rust-yellowrust_host_plant_reaction', + 'septoria-septoria_incidence' : 'septoria-septoria_incidence', + 'septoria-septoria_severity' : 'septoria-septoria_severity', + 'other_diseases_group-other_diseases' : 'other_diseases_group-other_diseases', + 'score_diseases_count' : 'score_diseases_count', + 'SET-OF-score_diseases' : 'SET-OF-score_diseases', + 'samples_collected' : 'samples_collected', + 'samples_type' : 'samples_type', + 'sample_size-number_stemrust_live' : 'sample_size-number_stemrust_live', + 'sample_size-number_stemrust_dead_dna' : 'sample_size-number_stemrust_dead_dna', + 'sample_size-number_yellowrust_live' : 'sample_size-number_yellowrust_live', + 'sample_size-number_yellowrust_dead' : 'sample_size-number_yellowrust_dead', + 'sample_size-number_leafrust_live' : 'sample_size-number_leafrust_live', + 'sample_size-using_barcode' : 'sample_size-using_barcode', + 'live_stemrust_samples_count' : 'live_stemrust_samples_count', + 'SET-OF-live_stemrust_samples' : 'SET-OF-live_stemrust_samples', + 'dead_stemrust_samples_count' : 'dead_stemrust_samples_count', + 'SET-OF-dead_stemrust_samples' : 'SET-OF-dead_stemrust_samples', + 'live_yellowrust_samples_count' : 'live_yellowrust_samples_count', + 'SET-OF-live_yellowrust_samples' : 'SET-OF-live_yellowrust_samples', + 'dead_yellowrust_samples_count' : 'dead_yellowrust_samples_count', + 'SET-OF-dead_yellowrust_samples' : 'SET-OF-dead_yellowrust_samples', + 'live_leafrust_samples_count' : 'live_leafrust_samples_count', + 'SET-OF-live_leafrust_samples' : 'SET-OF-live_leafrust_samples', + 'comment' : 'comment', + 'meta-instanceID' : 'meta-instanceID', + 'meta-instanceName' : 'None', + 'KEY' : 'KEY', + } + + + + +COLUMN_PARSER_DICT = { + + 'site_information-field_area' : 'site_information-field_area', + + } + +def get_ODK_SA_form_as_csv(form_credentials: dict, jobPath: str, config: dict, status): + '''Given a dict with a single ODK form to download from an ODK Aggregate + server with South Asia formatting, obtains it and converts to csv.''' + + ODK_output_path_template = config['Survey'].get('ODKDatabasePathTemplate','${WorkspacePathout}/ODK_DB/') + ODK_output_path = Template(ODK_output_path_template).substitute(**config) + + # get data from ODK server + description_short = 'ODK_SA download' + description_long = 'survey download from ODK_SA server' + + # get path to ODK executable + ODK_jar = form_credentials['ODK_jar'] + assert os.path.exists(ODK_jar) + + skip_download: bool = config['Survey'].get('SkipServerDownload', False) + + ODK_download_success = True + + if not skip_download: + try: + ODK_download = ['java', + '-jar', ODK_jar, + '--pull_aggregate', + '--form_id', form_credentials['form_id'], + '--storage_directory', ODK_output_path, + '--odk_url', form_credentials['url'], + '--odk_username', form_credentials['user'], + '--odk_password', form_credentials['pass']] + + logger.debug('Performing ' + description_long) + + # perform a pull from the ODK server, and if it fails write a warning message + + subprocess_and_log(ODK_download,description_short,description_long,log_type='warning',check=True) + + except subprocess.CalledProcessError as e: + status.reset('WARNING') + ODK_download_success = False + + #TODO: Check it came down cleanly ($serverOutputDir is created whether cleanly or not, so test more explicitly): + + ODK_csv_path = f"{jobPath}/ExportCSV/" + + Path(ODK_csv_path).mkdir(parents = True, exist_ok = True) + + ODK_raw_csv_filename = f"SurveyData_{form_credentials['form_id']}.csv" + + ODK_proc_csv_filename = f"SurveyData_{form_credentials['form_id']}_proc.csv" + + if ODK_download_success and not skip_download: + + description_short = 'ODK export' + description_long = 'converting ODK download to csv' + logger.debug(description_long) + + ODK_java_to_csv = ['java', + '-jar', ODK_jar, + '--export', + '--form_id', form_credentials['form_id'], + '--storage_directory',ODK_output_path, + '--export_directory',ODK_csv_path, + '--export_filename',ODK_raw_csv_filename] + + logger.debug('Performing ' + description_long) + + try: + subprocess_and_log(ODK_java_to_csv,description_short,description_long,check=True) + + except subprocess.CalledProcessError as e: + status.reset('WARNING') + ODK_download_success = False + + # read the raw ODK_SA survey csv + dataframe_raw = read_csv(ODK_csv_path+ODK_raw_csv_filename) + + # process to match ODK format + + dataframe_processed = parse_columns(dataframe_raw,COLUMN_PARSER_DICT) + + # parse area information + area_coln = 'site_information-field_area' + unit_m2_coln = 'site_information-unit_m2' + field_size_coln = 'site_information-field_size' + + # this column should all be strings describing the local unit + area = dataframe_processed[area_coln] + + field_size = dataframe_processed[field_size_coln] # number of local units surveyed + area_unit_m2 = dataframe_processed[unit_m2_coln] # area of local unit in m2 + + ha_per_m2 = 1.e-4 # hectare in units of m2 + + area_in_ha = field_size * area_unit_m2 * ha_per_m2 + + dataframe_processed[area_coln] = area_in_ha + del dataframe_processed[unit_m2_coln] + del dataframe_processed[field_size_coln] + + logger.debug('Saving processed csv file') + + dataframe_processed.to_csv(ODK_csv_path+ODK_proc_csv_filename,index=False,quoting=QUOTE_MINIMAL) + + if not ODK_download_success or skip_download: + + logger.info("Because ODK server download failed somewhere (or we are skipping downloads), trying to recover by copying recent download") + + ODK_copy_success = False + + days_back = 1 + acceptable_days_back = int(config['Survey']['AcceptableDowntimeDays']) + logger.debug(f"Acceptable server downtime is set to {acceptable_days_back} days") + + while ((not ODK_copy_success) and (days_back <= acceptable_days_back)): + current_date = datetime.datetime.strptime(config['StartString'],'%Y%m%d') + + past_date = current_date - datetime.timedelta(days=days_back) + + #past_jobPath = f"{config['WorkspacePathout']}{short_name[component]}_{past_date.strftime('%Y%m%d')}" + past_jobPath = f"{config['WorkspacePath']}/SURVEYDATA_{past_date.strftime('%Y%m%d')}" + + past_ODK_csv_path = f"{past_jobPath}/ExportCSV/" + + try: + # check that python or perl coordinator script succeeded for that date + success_py = os.path.isfile(f"{past_jobPath}/STATUS_SUCCESS") + success_with_warning_py = os.path.isfile(f"{past_jobPath}/STATUS_SUCCESS_WITH_WARNING") + success_perl = os.path.isfile(f"{past_jobPath}/SURVEYDATA_SUCCESS.txt") + assert success_py or success_with_warning_py or success_perl + + #logger.warning(f"Temporary rename of expected previous download, for jobs before ~Apr 2021") + #past_ODK_csv_filename = f"SurveyData.csv" + past_ODK_csv_filename = ODK_proc_csv_filename + + logger.info(f"Looking for {past_ODK_csv_path+past_ODK_csv_filename}") + print(f"Looking for {past_ODK_csv_path+past_ODK_csv_filename}") + + copyfile(past_ODK_csv_path+past_ODK_csv_filename,ODK_csv_path+ODK_proc_csv_filename) + + assert os.path.isfile(ODK_csv_path+ODK_proc_csv_filename) + + ODK_copy_success = True + except: + logger.info(f"Not found an ODK download in {past_ODK_csv_path}") + + days_back += 1 + + if not ODK_copy_success: + logger.error(f"Failed get a suitable copy of survey data.") + status.reset('ERROR') + endJob(status,premature=True) + + logger.warning(f"Using ODK download from {past_jobPath}.") + + return ODK_csv_path+ODK_proc_csv_filename