diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py index 1c05c999f6a1d309eef7827ab87719dab6a22736..020f5e1851832193365509b25278fa8b2c7f725b 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/ProcessorSurveys.py @@ -33,6 +33,7 @@ from source_gen.clustering import run_case from ProcessorSurveysODK import get_ODK_form_as_csv from ProcessorSurveysODKv2 import get_ODKv2_form_as_csv +from ProcessorSurveysnewODK import get_newODK_form_as_csv from ProcessorSurveyskobotoolbox import get_kobotoolbox_form_as_csv from ProcessorSurveysWRSIS import get_WRSIS_form_as_csv from ProcessorSurveysWRT import get_WRT_form_as_csv @@ -50,7 +51,8 @@ GET_FORM_AS_CSV_DICT = { 'kobotoolbox' : get_kobotoolbox_form_as_csv, 'WRSIS' : get_WRSIS_form_as_csv, 'WRT' : get_WRT_form_as_csv, - 'ODKv2' : get_ODKv2_form_as_csv + 'ODKv2' : get_ODKv2_form_as_csv, + 'newODK' : get_newODK_form_as_csv, } def process_pre_job_survey(input_args): diff --git a/coordinator/ProcessorSurveysnewODK.py b/coordinator/ProcessorSurveysnewODK.py new file mode 100644 index 0000000000000000000000000000000000000000..a290f7a2daf63c892fbc98799ec36d30b24a5805 --- /dev/null +++ b/coordinator/ProcessorSurveysnewODK.py @@ -0,0 +1,224 @@ +#ProcessorSurveyskobotoolbox.py +"""Functions for parsing wheat rust survey records from the new ODK form on the kobotoolbox server.""" + +import csv +import datetime +import logging +import os +from pathlib import Path +import requests + +from shutil import copyfile +from pandas import DataFrame + +from ProcessorSurveyUtils import parse_columns +from ProcessorUtils import ( + endJob, + add_filters_to_sublogger, +) + +logger = logging.getLogger('Processor.Surveys.kobotoolbox') +add_filters_to_sublogger(logger) + +def get_from_kobotoolbox(url,form_id,form_token,**kwargs): + '''Make the HTTP request, then checking the status code.''' + + # Kenya survey form + #url = 'https://kf.kobotoolbox.org/' + #form_name = 'Wheat rust survey 1.0' + #form_id = 'akpyJHvYxkLKPkxFJnPyTW' + #form_token = '???' # this is sensitive + + url_for_requests = url #f"{url}api/v2/" + + url_to_get_form = f"{url_for_requests}assets/{form_id}/data.json" + + headers = {'Authorization': f"Token {form_token}"} + + response = requests.get(url_to_get_form,headers=headers) + + if response.status_code != 200: + raise requests.exceptions.HTTPError('HTTP status was not 200') + + logger.info('successful connection to kobotoolbox server') + return response + +def build_dataframe(response): + '''Crude merging of list of dicts into pandas dataframe''' + + result_count = response.json()['count'] + + logger.info(f"{result_count} records") + + request_results = response.json()['results'] + + df = DataFrame.from_records(request_results) + + return df + +def get_newODK_form_as_csv(form_credentials: dict, jobPath: str, config: dict, status): + '''Given a dict with a single kobotoolbox form to download from a kobotoolbox + server, obtains it and converts to csv.''' + + output_dir = 'Export_newCSV' + output_path = f"{jobPath}/{output_dir}/" + + Path(output_path).mkdir(parents=True, exist_ok=True) + + # get data from kobotoolbox server + + # keys are column names in the input dataframe + # values that are None mean they should be dropped + # values that are string simply rename the column + # values that are functions should be run with that key and returns series/dataframe + column_parser_dict = { + '__version__' : 'None', + '_attachments' : 'None', + #'_bamboo_dataset_id' : 'None', + '_geolocation' : 'None', # looks like a duplication of survey_infromation/location + '_id' : 'None', + '_notes' : 'None', + '_status' : 'None', + '_submission_time' : ('parse_date',(('name_out','SubmissionDate'),('fmt_in','%Y-%m-%dT%H:%M:%S'))), + '_submitted_by' : 'None', + '_tags' : 'None', + '_uuid' : 'KEY', + '_validation_status' : 'None', + '_xform_id_string' : 'None', + 'comment' : 'comment', + 'dead_stemrust_samples' : 'SET-OF-dead_stemrust_samples', + 'dead_stemrust_samples_count' : 'dead_stemrust_samples_count', + 'dead_yellowrust_samples' : 'SET-OF-dead_yellowrust_samples', + 'dead_yellowrust_samples_count' : 'dead_yellowrust_samples_count', + 'deviceid' : 'deviceid', + 'end' : ('parse_date',(('name_out','end'),('fmt_in','%Y-%m-%dT%H:%M:%S.%f%z'))), + 'formhub/uuid' : 'None', + 'imei' : 'imei', + 'leaf_rust/leafrust_host_plant_reaction' : 'leaf_rust-leafrust_host_plant_reaction', + 'leaf_rust/leafrust_incidence' : 'leaf_rust-leafrust_incidence', + 'leaf_rust/leafrust_severity' : 'leaf_rust-leafrust_severity', + 'live_leafrust_samples' : 'SET-OF-live_leafrust_samples', + 'live_leafrust_samples_count' : 'live_leafrust_samples_count', + 'live_stemrust_samples' : 'SET-OF-live_stemrust_samples', + 'live_stemrust_samples_count' : 'live_stemrust_samples_count', + 'live_yellowrust_samples' : 'SET-OF-live_yellowrust_samples', + 'live_yellowrust_samples_count' : 'live_yellowrust_samples_count', + 'meta/instanceID' : 'meta-instanceID', + 'samples_collected' : 'samples_collected', + 'samples_type' : 'samples_type', + 'septoria/septoria_incidence' : 'septoria-septoria_incidence', + 'septoria/septoria_severity' : 'septoria-septoria_severity', + 'site_information/crop' : 'site_information-crop', + 'site_information/field_area' : 'site_information-field_area', + 'site_information/growth_stage' : 'site_information-growth_stage', + 'site_information/survey_site' : 'site_information-survey_site', + 'site_information/variety' : 'site_information-variety', + 'start' : ('parse_date',(('name_out','start'),('fmt_in','%Y-%m-%dT%H:%M:%S.%f%z'))), + 'stem_rust/Stemrust_severity' : 'stem_rust-Stemrust_severity', + 'stem_rust/stemrust_host_plant_reaction' : 'stem_rust-stemrust_host_plant_reaction', + 'stem_rust/stemrust_incidence' : 'stem_rust-stemrust_incidence', + 'survey_infromation/location' : ('parse_location_kobotoolbox',()), + 'survey_infromation/location_name' : 'survey_infromation-location_name', + 'survey_infromation/survey_date' : ('parse_date',(('name_out','survey_infromation-survey_date'),('fmt_in','%Y-%m-%d'))), + 'surveyor_infromation/country' : 'surveyor_infromation-country', + 'surveyor_infromation/institution' : 'surveyor_infromation-institution', + 'surveyor_infromation/surveyor_name' : 'surveyor_infromation-surveyor_name', + 'today' : ('parse_date',(('name_out','today'),('fmt_in','%Y-%m-%d'))), + 'username' : 'username', + 'yellow_rust/yellowrust_host_plant_reaction' : 'yellow_rust-yellowrust_host_plant_reaction', + 'yellow_rust/yellowrust_incidence' : 'yellow_rust-yellowrust_incidence', + 'yellow_rust/yellowrust_severity' : 'yellow_rust-yellowrust_severity', + } + + logger.debug('Performing download') + + # perform a pull from the server, and if it fails write a warning message + + download_success = True + + skip_download: bool = config['Survey'].get('SkipServerDownload', False) + + if not skip_download: + try: + + request = get_from_kobotoolbox(**form_credentials) + + except requests.exceptions.RequestException as e: + status.reset('WARNING') + + download_success = False + + # define filenames + csv_filename = f"SurveyData_{form_credentials['form_id']}.csv" + + csv_processed_filename = f"SurveyDataProcessed.csv" + csv_processed_path = f"{output_path}/{csv_processed_filename}" + + if download_success and not skip_download: + # parse dataframe + + dataframe_raw = build_dataframe(request) + + logger.debug('Saving raw csv file') + + df_raw_filename = f"{output_path}/{csv_filename}.csv" + + dataframe_raw.to_csv(df_raw_filename,index=False,quoting=csv.QUOTE_MINIMAL) + + # process to match ODK format + + dataframe_processed = parse_columns(dataframe_raw,column_parser_dict) + + logger.debug('Saving processed csv file') + + dataframe_processed.to_csv(csv_processed_path,index=False,quoting=csv.QUOTE_MINIMAL) + + if not download_success or skip_download: + + logger.info("Because server download failed somewhere (or we are skipping downloads), trying to recover by copying recent download") + + copy_success = False + + days_back = 1 + acceptable_days_back = int(config['Survey']['AcceptableDowntimeDays']) + logger.debug(f"Acceptable server downtime is set to {acceptable_days_back} days") + + while ((not copy_success) and (days_back <= acceptable_days_back)): + + current_date = datetime.datetime.strptime(config['StartString'],'%Y%m%d') + + past_date = current_date - datetime.timedelta(days=days_back) + + #past_jobPath = f"{config['WorkspacePathout']}{short_name[component]}_{past_date.strftime('%Y%m%d')}" + past_jobPath = f"{config['WorkspacePath']}/SURVEYDATA_{past_date.strftime('%Y%m%d')}" + + past_output_path = f"{past_jobPath}/{output_dir}/" + + try: + # check that python or perl coordinator script succeeded for that date + success_py = os.path.isfile(f"{past_jobPath}/STATUS_SUCCESS") + success_perl = os.path.isfile(f"{past_jobPath}/SURVEYDATA_SUCCESS.txt") + assert success_py or success_perl + + past_csv_filename = csv_processed_filename + + logger.info(f"Looking for {past_output_path+past_csv_filename}") + + copyfile(past_output_path+past_csv_filename,csv_processed_path) + + assert os.path.isfile(csv_processed_path) + + copy_success = True + except: + logger.info(f"Not found a kobotoolbox download in {past_output_path}") + + days_back += 1 + + if not copy_success: + logger.error(f"Failed get a suitable copy of survey data.") + status.reset('ERROR') + endJob(status,premature=True) + + logger.warning(f"Using download from {past_jobPath}.") + + return csv_processed_path