FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit baca46e6 authored by Dr T. Mona's avatar Dr T. Mona
Browse files

Feat: ProcessorSurveysWRT

Survey processor to handle WheatRustToolbox download and formating.
parent 026708e7
No related branches found
No related tags found
No related merge requests found
#ProcessorSurveysWRT.py
"""Functions for parsing wheat rust survey records from the WheatRustToolbox (WRT)."""
import csv
import datetime
import logging
import os
from pathlib import Path
import requests
from shutil import copyfile
from pandas import json_normalize
from ProcessorSurveyUtils import parse_columns
from ProcessorUtils import (
endJob,
add_filters_to_sublogger,
)
logger = logging.getLogger('Processor.Surveys.WRT')
add_filters_to_sublogger(logger)
# Cases to convert WRT categories to expected short name category or value
cases_incident = {
'N':'none',
'L':'low',
'M':'medium',
'H':'high'
}
cases_severity = {
'N':0,
'L':10,
'M':30,
'H':50
}
def get_from_WRT(form_credentials: dict, startDate: str, endDate: str):
url = f"{form_credentials['url']}WheatRustAPI/api/Survey"
url += '?strStopDate=' + endDate
url += '&strStartDate=' + startDate
url += '&strArrayCountryID='+ form_credentials['countryID']
# set up http session
session = requests.Session()
# provide authorisation
session.auth = (form_credentials['user'],form_credentials['pass'])
response = session.get(url)
# checking the HTTP status code (not the code in the response)
if response.status_code == 200:
logger.info('HTTP request succeeded OK')
elif response.status_code == 204:
logger.info('HTTP request succeeded OK, but no data available')
else:
logger.info("HTTP response did not succeed OK, code is {:d}: {:s} ".format(response.status_code, response.reason))
raise requests.exceptions.HTTPError('HTTP status was not 200')
return response
def nested_to_flattened(df):
'''WRT rust data is in a nested format, so it require to be flattened.
To do this, the nested data need to be spareated into dedicated columns.'''
# check if the dataframe is empty, if it is then add the raw columns
if len(df.index) == 0:
logger.info('Recent WRT download is empty.')
logger.info('Adding raw columns.')
RAW_COLUMNS = [
"ObservationID",
"OriginalID",
"Origin",
"Country",
"Latitude",
"Longitude",
"FieldArea",
"Cultivar",
"CollectionDate",
"GrowthStage",
"listDisease"]
for i in RAW_COLUMNS:
df[i] = ""
# add new columns
logger.info('Adding new columns')
NEW_COLUMNS = ['imei',
'sample_size-number_yellowrust_live',
'sample_size-number_stemrust_live',
'dead_stemrust_samples_count',
'samples_collected',
'sample_size-number_yellowrust_dead',
'live_leafrust_samples_count',
'other_crop',
'live_yellowrust_samples_count',
'subscriberid',
'sample_size-using_barcode',
'start',
'score_diseases_count',
'phonenumber',
'survey_infromation-location-Accuracy',
'SET-OF-live_yellowrust_samples',
'SET-OF-score_diseases',
'meta-instanceID',
'deviceid',
'end',
'samples_type',
'live_stemrust_samples_count',
'dead_yellowrust_samples_count',
'SET-OF-live_leafrust_samples',
'KEY',
'other_diseases_group-other_diseases',
'survey_infromation-location-Altitude',
'SET-OF-dead_stemrust_samples',
'comment',
'sample_size-number_leafrust_live',
'today',
'SET-OF-dead_yellowrust_samples',
'username',
'SET-OF-live_stemrust_samples',
'sample_size-number_stemrust_dead_dna',
'SubmissionDate',
'survey_infromation-location_name',
'site_information-crop',
'site_information-survey_site',
'surveyor_infromation-institution',
'surveyor_infromation-surveyor_name']
for i in NEW_COLUMNS:
df[i] = ""
# add dedicated rust columns, with default values
NEW_RUST_COLUMNS = {"SR.Incident":"none","SR.Severity":"-9","SR.Reaction":"na",
"LR.Incident":"none","LR.Severity":"-9","LR.Reaction":"na",
"YR.Incident":"none","YR.Severity":"-9","YR.Reaction":"na",
"Septoria.Incident":"none","Septoria.Severity":"0"}
for i in NEW_RUST_COLUMNS.keys():
df[i] = NEW_RUST_COLUMNS[i]
logger.info('Separating nested information into dedicated columns')
for index,row in df.iterrows():
nested_row = row["listDisease"]
for rr in range(len(nested_row)):
# separating nested information into the dedicated columns
row[nested_row[rr]["DiseaseName"] + ".Incident"] = nested_row[rr]["IncidenceCategory"]
row[nested_row[rr]["DiseaseName"] + ".Severity"] = nested_row[rr]["SeverityCategory"]
df.loc[index] = row
return df
def get_WRT_form_as_csv(form_credentials: dict, jobPath: str, config: dict, status):
'''Given a dict with a single WRT form to download from WRT, obtains it and converts to csv.'''
output_dir = 'Export_WRT'
output_path = f"{jobPath}/{output_dir}/"
Path(output_path).mkdir(parents=True, exist_ok=True)
# get data from WRT
# keys are column names in the input dataframe
# values that are None mean they should be dropped
# values that are string simply rename the column
# values that are functions should be run with that key and returns series/dataframe
column_parser_dict = {
"ObservationID" : 'KEY',
"OriginalID" : 'None',
"Origin" : "Origin",
"Country" : 'surveyor_infromation-country',
"Latitude" : 'survey_infromation-location-Latitude',
"Longitude" : 'survey_infromation-location-Longitude',
"FieldArea" : 'site_information-field_area',
"Cultivar" : 'site_information-variety',
"CollectionDate" : ('parse_date',(('name_out','survey_infromation-survey_date'),('fmt_in','%m/%d/%Y'))),
"GrowthStage" : 'site_information-growth_stage',
"listDisease" : 'None',
"YR.Severity" : ('parse_cases',(('name_out','yellow_rust-yellowrust_severity'),('cases', cases_severity),('dtype', int))),
"YR.Incident" : ('parse_cases',(('name_out','yellow_rust-yellowrust_incidence'),('cases', cases_incident))),
"YR.Reaction" : 'yellow_rust-yellowrust_host_plant_reaction',
"SR.Severity" : ('parse_cases',(('name_out','stem_rust-Stemrust_severity'),('cases', cases_severity),('dtype', int))),
"SR.Incident" : ('parse_cases',(('name_out','stem_rust-stemrust_incidence'),('cases', cases_incident))),
"SR.Reaction" : 'stem_rust-stemrust_host_plant_reaction',
"LR.Severity" : ('parse_cases',(('name_out','leaf_rust-leafrust_severity'),('cases', cases_severity),('dtype', int))),
"LR.Incident" : ('parse_cases',(('name_out','leaf_rust-leafrust_incidence'),('cases', cases_incident))),
"LR.Reaction" : 'leaf_rust-leafrust_host_plant_reaction',
"Septoria.Severity" : 'septoria-septoria_severity',
"Septoria.Incident" : 'septoria-septoria_incidence'
}
# perform a pull from the server, and if it fails write a warning message
download_success = True
start_date = datetime.datetime.strptime(config['Survey']['SeasonStartString'],'%Y%m%d').strftime('%Y-%m-%d')
end_date = datetime.datetime.strptime(config['StartString'], '%Y%m%d').strftime('%Y-%m-%d')
logger.debug(f'Performing download from WRT between {start_date} and {end_date}')
skip_download: bool = config['Survey'].get('SkipServerDownload', False)
if not skip_download:
try:
request = get_from_WRT(form_credentials,start_date,end_date)
#assert "response" in request.json().keys()
except requests.exceptions.RequestException as e:
status.reset('WARNING')
download_success = False
except AssertionError:
logger.warning(f"WRT returned incomplete data:\n{request.json()}")
status.reset('WARNING')
download_success = False
# define filenames
csv_filename = f"SurveyData_raw.csv"
csv_processed_filename = f"SurveyDataProcessed.csv"
csv_processed_path = f"{output_path}/{csv_processed_filename}"
if download_success and not skip_download:
# parse dataframe
logger.debug('Saving raw csv file')
df_raw_filename = f"{output_path}/{csv_filename}"
dataframe_raw = json_normalize(request.json()["listObservation"])
dataframe_raw.to_csv(df_raw_filename,index=False,quoting=csv.QUOTE_MINIMAL)
# flatten the nested dataframe
dataframe_flattened = nested_to_flattened(dataframe_raw)
# process to match ODK format
dataframe_processed = parse_columns(dataframe_flattened,column_parser_dict)
logger.debug('Saving processed csv file')
dataframe_processed.to_csv(csv_processed_path,index=False,quoting=csv.QUOTE_MINIMAL)
if not download_success or skip_download:
logger.info("Because server download failed somewhere (or we are skipping downloads), trying to recover by copying recent download")
copy_success = False
days_back = 1
acceptable_days_back = int(config['Survey']['AcceptableDowntimeDays'])
logger.debug(f"Acceptable server downtime is set to {acceptable_days_back} days")
while ((not copy_success) and (days_back <= acceptable_days_back)):
current_date = datetime.datetime.strptime(config['StartString'],'%Y%m%d')
past_date = current_date - datetime.timedelta(days=days_back)
#past_jobPath = f"{config['WorkspacePathout']}{short_name[component]}_{past_date.strftime('%Y%m%d')}"
past_jobPath = f"{config['WorkspacePath']}/SURVEYDATA_{past_date.strftime('%Y%m%d')}"
past_output_path = f"{past_jobPath}/{output_dir}/"
try:
# check that python or perl coordinator script succeeded for that date
success_py = os.path.isfile(f"{past_jobPath}/STATUS_SUCCESS")
success_perl = os.path.isfile(f"{past_jobPath}/SURVEYDATA_SUCCESS.txt")
assert success_py or success_perl
past_csv_filename = csv_processed_filename
logger.info(f"Looking for {past_output_path+past_csv_filename}")
copyfile(past_output_path+past_csv_filename,csv_processed_path)
assert os.path.isfile(csv_processed_path)
copy_success = True
except:
logger.info(f"Not found a WRT download in {past_output_path}")
days_back += 1
if not copy_success:
logger.error(f"Failed get a suitable copy of survey data.")
status.reset('ERROR')
endJob(status,premature=True)
logger.warning(f"Using download from {past_jobPath}.")
return csv_processed_path
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment