#ProcessorSurveysWRSIS.py """Functions for parsing wheat rust survey records from WRSIS.""" import csv import datetime import logging import os from pathlib import Path import requests from shutil import copyfile from pandas import json_normalize from ProcessorSurveyUtils import parse_columns from ProcessorUtils import ( endJob, add_filters_to_sublogger, ) logger = logging.getLogger('Processor.Surveys.WRSIS') add_filters_to_sublogger(logger) def get_from_WRSIS(form_credentials: dict, startDate: str, endDate: str): '''Make the HTTP request, then checking the status code.''' date_params = { 'fromDate':startDate, 'toDate':endDate} # set up http session session = requests.Session() # provide authorisation session.auth = (form_credentials['user'],form_credentials['pass']) response = session.post(f"{form_credentials['url']}getUKMetSurveyData",json=date_params) # possible HTTP responses as provided in the API document # (I've seen some other responses though, e.g. 415) # It seems there is another layer of status codes status_codes = { 200 : 'OK', 201 : 'Created', 202 : 'Accepted (Request accepted, and queued for execution)', 400 : 'Bad request', 401 : 'Authentication failure', 403 : 'Forbidden', 404 : 'Resource not found', 405 : 'Method Not Allowed', 409 : 'Conflict', 412 : 'Precondition Failed', 413 : 'Request Entity Too Large', 500 : 'Internal Server Error', 501 : 'Not Implemented', 503 : 'Service Unavailable'} # checking the HTTP status code (not the code in the response) if response.status_code == 200: logger.info('HTTP request succeeded OK') elif response.status_code in status_codes: logger.info("HTTP response did not succeed OK, code is {:d}: {:s} ".format(response.status_code,status_codes[response.status_code])) raise requests.exceptions.HTTPError('HTTP status was not 200') else: logger.info("HTTP response did not succeed OK, unknown code {:d}".format(response.status_code)) raise requests.exceptions.HTTPError('HTTP status was not 200') return response def categorize_incident(incident): '''Converting incident values into category string. TODO: float values are not handled''' try: incident_value = int(incident) if 0 < incident_value <= 20: incident_category = "low" elif 20 < incident_value <= 40: incident_category = "medium" elif 40 < incident_value <= 100: incident_category = "high" else: incident_category = "none" except: if incident.lower() in ["low", "medium", "high", "none", "na"]: incident_category = incident.lower() else: incident_category = "none" return incident_category def nested_to_flattened(df): '''WRSIS rust data is in a nested format, so it require to be flattened. To do this, the nested data need to be spareated into dedicated columns.''' # check if the dataframe is empty, if it is then add the raw columns if len(df.index) == 0: logger.info('Recent WRSIS download is empty.') logger.info('Adding raw columns.') RAW_COLUMNS = [ 'Rust Details', 'Other Disease', 'Sample Details', 'Survey Details.Survey No', 'Survey Details.Latitude', 'Survey Details.First Rust Observation Date', 'Survey Details.Longitude', 'Survey Details.Kebele Name', 'Survey Details.Publish Date', 'Survey Details.Region Name', 'Survey Details.Survey Date', 'Survey Details.Season', 'Survey Details.Planting Date', 'Survey Details.Woreda Name', 'Survey Details.Location other details', 'Survey Details.Tillering Date', 'Survey Details.Zone Name', 'Survey Other Details.Moisture', 'Survey Other Details.Soil colour', 'Survey Other Details.Weed Control', 'Survey Other Details.Irrigated', 'Site Information.Wheat Type', 'Site Information.Growth Stage', 'Site Information.Varity Name', 'Site Information.Survey Site', 'Site Information.Site Area', 'Surveyor Details.Surveyors', 'Surveyor Details.Country', 'Surveyor Details.Other Surveyors', 'Surveyor Details.Institution Name', 'Fungicide Details.Fungicide Name', 'Fungicide Details.Spray Date', 'Fungicide Details.EffectiveNess', 'Fungicide Details.Used Dose'] for i in RAW_COLUMNS: df[i] = "" # add new columns logger.info('Adding new columns') NEW_COLUMNS = ['imei', 'sample_size-number_yellowrust_live', 'sample_size-number_stemrust_live', 'dead_stemrust_samples_count', 'samples_collected', 'sample_size-number_yellowrust_dead', 'live_leafrust_samples_count', 'other_crop', 'live_yellowrust_samples_count', 'subscriberid', 'sample_size-using_barcode', 'start', 'score_diseases_count', 'phonenumber', 'survey_infromation-location-Accuracy', 'SET-OF-live_yellowrust_samples', 'SET-OF-score_diseases', 'meta-instanceID', 'deviceid', 'end', 'samples_type', 'live_stemrust_samples_count', 'dead_yellowrust_samples_count', 'SET-OF-live_leafrust_samples', 'other_diseases_group-other_diseases', 'survey_infromation-location-Altitude', 'SET-OF-dead_stemrust_samples', 'comment', 'sample_size-number_leafrust_live', 'today', 'SET-OF-dead_yellowrust_samples', 'username', 'SET-OF-live_stemrust_samples', 'sample_size-number_stemrust_dead_dna'] for i in NEW_COLUMNS: df[i] = "" # add dedicated rust columns, with default values NEW_RUST_COLUMNS = {"Stem Rust.Incident":"none","Stem Rust.Severity":"-9","Stem Rust.Reaction":"na", "Leaf Rust.Incident":"none","Leaf Rust.Severity":"-9","Leaf Rust.Reaction":"na", "Yellow Rust.Incident":"none","Yellow Rust.Severity":"-9","Yellow Rust.Reaction":"na", "Septoria.Incident":"none","Septoria.Severity":"0"} for i in NEW_RUST_COLUMNS.keys(): df[i] = NEW_RUST_COLUMNS[i] logger.info('Separating nested information into dedicated columns') for index,row in df.iterrows(): nested_row = row["Rust Details"] for rr in range(len(nested_row)): # separating nested information into the dedicated columns row[nested_row[rr]["Rust Type"] + ".Incident"] = categorize_incident(nested_row[rr]["Incident"]) row[nested_row[rr]["Rust Type"] + ".Severity"] = nested_row[rr]["Severity"] row[nested_row[rr]["Rust Type"] + ".Reaction"] = nested_row[rr]["Reaction"] df.loc[index] = row return df def get_WRSIS_form_as_csv(form_credentials: dict, jobPath: str, config: dict, status): '''Given a dict with a single WRSIS form to download from WRSIS, obtains it and converts to csv.''' output_dir = 'Export_WRSIS' output_path = f"{jobPath}/{output_dir}/" Path(output_path).mkdir(parents=True, exist_ok=True) # get data from WRSIS # keys are column names in the input dataframe # values that are None mean they should be dropped # values that are string simply rename the column # values that are functions should be run with that key and returns series/dataframe column_parser_dict = { 'Rust Details' : 'None', 'Other Disease' : 'None', 'Sample Details' : 'None', 'Survey Details.Survey No' : 'KEY', 'Survey Details.Latitude' : 'survey_infromation-location-Latitude', 'Survey Details.First Rust Observation Date' : 'None', 'Survey Details.Longitude' : 'survey_infromation-location-Longitude', 'Survey Details.Kebele Name' : 'None', 'Survey Details.Publish Date' : ('parse_date',(('name_out','SubmissionDate'),('fmt_in','%d-%b-%Y'))), 'Survey Details.Region Name' : 'None', 'Survey Details.Survey Date' : ('parse_date',(('name_out','survey_infromation-survey_date'),('fmt_in','%d-%b-%Y'))), 'Survey Details.Season' : 'None', 'Survey Details.Planting Date' : 'None', 'Survey Details.Woreda Name' : 'None', 'Survey Details.Location other details' : 'None', 'Survey Details.Tillering Date' : 'None', 'Survey Details.Zone Name' : 'survey_infromation-location_name', 'Survey Other Details.Moisture' : 'None', 'Survey Other Details.Soil colour' : 'None', 'Survey Other Details.Weed Control' : 'None', 'Survey Other Details.Irrigated' : 'None', 'Site Information.Wheat Type' : 'site_information-crop', 'Site Information.Growth Stage' : 'site_information-growth_stage', 'Site Information.Varity Name' : 'site_information-variety', 'Site Information.Survey Site' : 'site_information-survey_site', 'Site Information.Site Area' : 'site_information-field_area', 'Surveyor Details.Surveyors' : 'surveyor_infromation-surveyor_name', 'Surveyor Details.Country' : 'surveyor_infromation-country', 'Surveyor Details.Institution Name' : 'surveyor_infromation-institution', 'Surveyor Details.Other Surveyors' : 'None', #'Fungicide Details.Fungicide Name' : 'None', #'Fungicide Details.Spray Date' : 'None', #'Fungicide Details.EffectiveNess' : 'None', #'Fungicide Details.Used Dose' : 'None', "Yellow Rust.Severity" : 'yellow_rust-yellowrust_severity', "Yellow Rust.Incident" : 'yellow_rust-yellowrust_incidence', "Yellow Rust.Reaction" : 'yellow_rust-yellowrust_host_plant_reaction', "Stem Rust.Severity" : 'stem_rust-Stemrust_severity', "Stem Rust.Incident" : 'stem_rust-stemrust_incidence', "Stem Rust.Reaction" : 'stem_rust-stemrust_host_plant_reaction', "Leaf Rust.Severity" : 'leaf_rust-leafrust_severity', "Leaf Rust.Incident" : 'leaf_rust-leafrust_incidence', "Leaf Rust.Reaction" : 'leaf_rust-leafrust_host_plant_reaction', "Septoria.Severity" : 'septoria-septoria_severity', "Septoria.Incident" : 'septoria-septoria_incidence' } # perform a pull from the server, and if it fails write a warning message download_success = True start_date = datetime.datetime.strptime(config['Survey']['SeasonStartString'],'%Y%m%d').strftime('%d-%m-%Y') end_date = datetime.datetime.strptime(config['StartString'], '%Y%m%d').strftime('%d-%m-%Y') logger.debug(f'Performing download from WRSIS between {start_date} and {end_date}') skip_download: bool = config['Survey'].get('SkipServerDownload', False) if not skip_download: try: request = get_from_WRSIS(form_credentials,start_date,end_date) assert "response" in request.json().keys() except requests.exceptions.RequestException as e: status.reset('WARNING') download_success = False except AssertionError: logger.warning(f"WRSIS returned incomplete data:\n{request.json()}") status.reset('WARNING') download_success = False # define filenames csv_filename = f"SurveyData_raw.csv" csv_processed_filename = f"SurveyDataProcessed.csv" csv_processed_path = f"{output_path}/{csv_processed_filename}" if download_success and not skip_download: # parse dataframe logger.debug('Saving raw csv file') df_raw_filename = f"{output_path}/{csv_filename}" dataframe_raw = json_normalize(request.json()["response"]["Rust Survey Data"]) dataframe_raw.to_csv(df_raw_filename,index=False,quoting=csv.QUOTE_MINIMAL) # flatten the nested dataframe dataframe_flattened = nested_to_flattened(dataframe_raw) # process to match ODK format dataframe_processed = parse_columns(dataframe_flattened,column_parser_dict) logger.debug('Saving processed csv file') dataframe_processed.to_csv(csv_processed_path,index=False,quoting=csv.QUOTE_MINIMAL) if not download_success or skip_download: logger.info("Because server download failed somewhere (or we are skipping downloads), trying to recover by copying recent download") copy_success = False days_back = 1 acceptable_days_back = int(config['Survey']['AcceptableDowntimeDays']) logger.debug(f"Acceptable server downtime is set to {acceptable_days_back} days") while ((not copy_success) and (days_back <= acceptable_days_back)): current_date = datetime.datetime.strptime(config['StartString'],'%Y%m%d') past_date = current_date - datetime.timedelta(days=days_back) #past_jobPath = f"{config['WorkspacePathout']}{short_name[component]}_{past_date.strftime('%Y%m%d')}" past_jobPath = f"{config['WorkspacePath']}/SURVEYDATA_{past_date.strftime('%Y%m%d')}" past_output_path = f"{past_jobPath}/{output_dir}/" try: # check that python or perl coordinator script succeeded for that date success_py = os.path.isfile(f"{past_jobPath}/STATUS_SUCCESS") success_perl = os.path.isfile(f"{past_jobPath}/SURVEYDATA_SUCCESS.txt") assert success_py or success_perl past_csv_filename = csv_processed_filename logger.info(f"Looking for {past_output_path+past_csv_filename}") copyfile(past_output_path+past_csv_filename,csv_processed_path) assert os.path.isfile(csv_processed_path) copy_success = True except: logger.info(f"Not found a WRSIS download in {past_output_path}") days_back += 1 if not copy_success: logger.error(f"Failed get a suitable copy of survey data.") status.reset('ERROR') endJob(status,premature=True) logger.warning(f"Using download from {past_jobPath}.") return csv_processed_path