From baca46e63a26fa5209bfb7a5ce737e42b3ddaf0b Mon Sep 17 00:00:00 2001 From: tm689 <tm689@cam.ac.uk> Date: Wed, 10 May 2023 12:15:12 +0100 Subject: [PATCH] Feat: ProcessorSurveysWRT Survey processor to handle WheatRustToolbox download and formating. --- coordinator/ProcessorSurveysWRT.py | 297 +++++++++++++++++++++++++++++ 1 file changed, 297 insertions(+) create mode 100644 coordinator/ProcessorSurveysWRT.py diff --git a/coordinator/ProcessorSurveysWRT.py b/coordinator/ProcessorSurveysWRT.py new file mode 100644 index 0000000..634eb5e --- /dev/null +++ b/coordinator/ProcessorSurveysWRT.py @@ -0,0 +1,297 @@ +#ProcessorSurveysWRT.py +"""Functions for parsing wheat rust survey records from the WheatRustToolbox (WRT).""" + +import csv +import datetime +import logging +import os +from pathlib import Path +import requests + +from shutil import copyfile +from pandas import json_normalize + +from ProcessorSurveyUtils import parse_columns +from ProcessorUtils import ( + endJob, + add_filters_to_sublogger, +) + +logger = logging.getLogger('Processor.Surveys.WRT') +add_filters_to_sublogger(logger) + +# Cases to convert WRT categories to expected short name category or value +cases_incident = { + 'N':'none', + 'L':'low', + 'M':'medium', + 'H':'high' + } + +cases_severity = { + 'N':0, + 'L':10, + 'M':30, + 'H':50 + } + +def get_from_WRT(form_credentials: dict, startDate: str, endDate: str): + url = f"{form_credentials['url']}WheatRustAPI/api/Survey" + url += '?strStopDate=' + endDate + url += '&strStartDate=' + startDate + url += '&strArrayCountryID='+ form_credentials['countryID'] + + # set up http session + session = requests.Session() + + # provide authorisation + session.auth = (form_credentials['user'],form_credentials['pass']) + + response = session.get(url) + + # checking the HTTP status code (not the code in the response) + if response.status_code == 200: + logger.info('HTTP request succeeded OK') + + elif response.status_code == 204: + logger.info('HTTP request succeeded OK, but no data available') + + else: + logger.info("HTTP response did not succeed OK, code is {:d}: {:s} ".format(response.status_code, response.reason)) + raise requests.exceptions.HTTPError('HTTP status was not 200') + + return response + + +def nested_to_flattened(df): + '''WRT rust data is in a nested format, so it require to be flattened. + To do this, the nested data need to be spareated into dedicated columns.''' + + # check if the dataframe is empty, if it is then add the raw columns + if len(df.index) == 0: + logger.info('Recent WRT download is empty.') + logger.info('Adding raw columns.') + RAW_COLUMNS = [ + "ObservationID", + "OriginalID", + "Origin", + "Country", + "Latitude", + "Longitude", + "FieldArea", + "Cultivar", + "CollectionDate", + "GrowthStage", + "listDisease"] + for i in RAW_COLUMNS: + df[i] = "" + + # add new columns + logger.info('Adding new columns') + NEW_COLUMNS = ['imei', + 'sample_size-number_yellowrust_live', + 'sample_size-number_stemrust_live', + 'dead_stemrust_samples_count', + 'samples_collected', + 'sample_size-number_yellowrust_dead', + 'live_leafrust_samples_count', + 'other_crop', + 'live_yellowrust_samples_count', + 'subscriberid', + 'sample_size-using_barcode', + 'start', + 'score_diseases_count', + 'phonenumber', + 'survey_infromation-location-Accuracy', + 'SET-OF-live_yellowrust_samples', + 'SET-OF-score_diseases', + 'meta-instanceID', + 'deviceid', + 'end', + 'samples_type', + 'live_stemrust_samples_count', + 'dead_yellowrust_samples_count', + 'SET-OF-live_leafrust_samples', + 'KEY', + 'other_diseases_group-other_diseases', + 'survey_infromation-location-Altitude', + 'SET-OF-dead_stemrust_samples', + 'comment', + 'sample_size-number_leafrust_live', + 'today', + 'SET-OF-dead_yellowrust_samples', + 'username', + 'SET-OF-live_stemrust_samples', + 'sample_size-number_stemrust_dead_dna', + 'SubmissionDate', + 'survey_infromation-location_name', + 'site_information-crop', + 'site_information-survey_site', + 'surveyor_infromation-institution', + 'surveyor_infromation-surveyor_name'] + + for i in NEW_COLUMNS: + df[i] = "" + + # add dedicated rust columns, with default values + NEW_RUST_COLUMNS = {"SR.Incident":"none","SR.Severity":"-9","SR.Reaction":"na", + "LR.Incident":"none","LR.Severity":"-9","LR.Reaction":"na", + "YR.Incident":"none","YR.Severity":"-9","YR.Reaction":"na", + "Septoria.Incident":"none","Septoria.Severity":"0"} + + for i in NEW_RUST_COLUMNS.keys(): + df[i] = NEW_RUST_COLUMNS[i] + + logger.info('Separating nested information into dedicated columns') + + for index,row in df.iterrows(): + nested_row = row["listDisease"] + for rr in range(len(nested_row)): + # separating nested information into the dedicated columns + row[nested_row[rr]["DiseaseName"] + ".Incident"] = nested_row[rr]["IncidenceCategory"] + row[nested_row[rr]["DiseaseName"] + ".Severity"] = nested_row[rr]["SeverityCategory"] + df.loc[index] = row + + return df + +def get_WRT_form_as_csv(form_credentials: dict, jobPath: str, config: dict, status): + '''Given a dict with a single WRT form to download from WRT, obtains it and converts to csv.''' + + output_dir = 'Export_WRT' + output_path = f"{jobPath}/{output_dir}/" + + Path(output_path).mkdir(parents=True, exist_ok=True) + + # get data from WRT + + # keys are column names in the input dataframe + # values that are None mean they should be dropped + # values that are string simply rename the column + # values that are functions should be run with that key and returns series/dataframe + column_parser_dict = { + "ObservationID" : 'KEY', + "OriginalID" : 'None', + "Origin" : "Origin", + "Country" : 'surveyor_infromation-country', + "Latitude" : 'survey_infromation-location-Latitude', + "Longitude" : 'survey_infromation-location-Longitude', + "FieldArea" : 'site_information-field_area', + "Cultivar" : 'site_information-variety', + "CollectionDate" : ('parse_date',(('name_out','survey_infromation-survey_date'),('fmt_in','%m/%d/%Y'))), + "GrowthStage" : 'site_information-growth_stage', + "listDisease" : 'None', + "YR.Severity" : ('parse_cases',(('name_out','yellow_rust-yellowrust_severity'),('cases', cases_severity),('dtype', int))), + "YR.Incident" : ('parse_cases',(('name_out','yellow_rust-yellowrust_incidence'),('cases', cases_incident))), + "YR.Reaction" : 'yellow_rust-yellowrust_host_plant_reaction', + "SR.Severity" : ('parse_cases',(('name_out','stem_rust-Stemrust_severity'),('cases', cases_severity),('dtype', int))), + "SR.Incident" : ('parse_cases',(('name_out','stem_rust-stemrust_incidence'),('cases', cases_incident))), + "SR.Reaction" : 'stem_rust-stemrust_host_plant_reaction', + "LR.Severity" : ('parse_cases',(('name_out','leaf_rust-leafrust_severity'),('cases', cases_severity),('dtype', int))), + "LR.Incident" : ('parse_cases',(('name_out','leaf_rust-leafrust_incidence'),('cases', cases_incident))), + "LR.Reaction" : 'leaf_rust-leafrust_host_plant_reaction', + "Septoria.Severity" : 'septoria-septoria_severity', + "Septoria.Incident" : 'septoria-septoria_incidence' + } + + # perform a pull from the server, and if it fails write a warning message + + download_success = True + + start_date = datetime.datetime.strptime(config['Survey']['SeasonStartString'],'%Y%m%d').strftime('%Y-%m-%d') + end_date = datetime.datetime.strptime(config['StartString'], '%Y%m%d').strftime('%Y-%m-%d') + + logger.debug(f'Performing download from WRT between {start_date} and {end_date}') + + skip_download: bool = config['Survey'].get('SkipServerDownload', False) + + if not skip_download: + try: + request = get_from_WRT(form_credentials,start_date,end_date) + #assert "response" in request.json().keys() + + except requests.exceptions.RequestException as e: + status.reset('WARNING') + + download_success = False + + except AssertionError: + logger.warning(f"WRT returned incomplete data:\n{request.json()}") + status.reset('WARNING') + + download_success = False + + # define filenames + csv_filename = f"SurveyData_raw.csv" + + csv_processed_filename = f"SurveyDataProcessed.csv" + csv_processed_path = f"{output_path}/{csv_processed_filename}" + + if download_success and not skip_download: + # parse dataframe + + logger.debug('Saving raw csv file') + + df_raw_filename = f"{output_path}/{csv_filename}" + dataframe_raw = json_normalize(request.json()["listObservation"]) + + dataframe_raw.to_csv(df_raw_filename,index=False,quoting=csv.QUOTE_MINIMAL) + + # flatten the nested dataframe + dataframe_flattened = nested_to_flattened(dataframe_raw) + + # process to match ODK format + dataframe_processed = parse_columns(dataframe_flattened,column_parser_dict) + + logger.debug('Saving processed csv file') + + dataframe_processed.to_csv(csv_processed_path,index=False,quoting=csv.QUOTE_MINIMAL) + + if not download_success or skip_download: + + logger.info("Because server download failed somewhere (or we are skipping downloads), trying to recover by copying recent download") + + copy_success = False + + days_back = 1 + acceptable_days_back = int(config['Survey']['AcceptableDowntimeDays']) + logger.debug(f"Acceptable server downtime is set to {acceptable_days_back} days") + + while ((not copy_success) and (days_back <= acceptable_days_back)): + + current_date = datetime.datetime.strptime(config['StartString'],'%Y%m%d') + + past_date = current_date - datetime.timedelta(days=days_back) + + #past_jobPath = f"{config['WorkspacePathout']}{short_name[component]}_{past_date.strftime('%Y%m%d')}" + past_jobPath = f"{config['WorkspacePath']}/SURVEYDATA_{past_date.strftime('%Y%m%d')}" + + past_output_path = f"{past_jobPath}/{output_dir}/" + + try: + # check that python or perl coordinator script succeeded for that date + success_py = os.path.isfile(f"{past_jobPath}/STATUS_SUCCESS") + success_perl = os.path.isfile(f"{past_jobPath}/SURVEYDATA_SUCCESS.txt") + assert success_py or success_perl + + past_csv_filename = csv_processed_filename + + logger.info(f"Looking for {past_output_path+past_csv_filename}") + + copyfile(past_output_path+past_csv_filename,csv_processed_path) + + assert os.path.isfile(csv_processed_path) + + copy_success = True + except: + logger.info(f"Not found a WRT download in {past_output_path}") + + days_back += 1 + + if not copy_success: + logger.error(f"Failed get a suitable copy of survey data.") + status.reset('ERROR') + endJob(status,premature=True) + + logger.warning(f"Using download from {past_jobPath}.") + + return csv_processed_path -- GitLab