#ProcessorSurveyskobotoolbox.py """Functions for parsing wheat rust survey records from the new ODK form on the kobotoolbox server.""" import csv import datetime import logging import os from pathlib import Path import requests from shutil import copyfile from pandas import DataFrame from ProcessorSurveyUtils import parse_columns from ProcessorUtils import ( endJob, add_filters_to_sublogger, ) logger = logging.getLogger('Processor.Surveys.kobotoolbox') add_filters_to_sublogger(logger) # Cases to convert incident categories to expected short name category or value cases_incident = { '':'none', 'N':'none', 'L':'low', 'M':'medium', 'H':'high', 'none':'none', 'low':'low', 'medium':'medium', 'high':'high' } def get_from_kobotoolbox(url,form_id,form_token,**kwargs): '''Make the HTTP request, then checking the status code.''' # Kenya survey form #url = 'https://kf.kobotoolbox.org/' #form_name = 'Wheat rust survey 1.0' #form_id = 'akpyJHvYxkLKPkxFJnPyTW' #form_token = '???' # this is sensitive url_for_requests = url #f"{url}api/v2/" url_to_get_form = f"{url_for_requests}assets/{form_id}/data.json" headers = {'Authorization': f"Token {form_token}"} response = requests.get(url_to_get_form,headers=headers) if response.status_code != 200: raise requests.exceptions.HTTPError('HTTP status was not 200') logger.info('successful connection to kobotoolbox server') return response def build_dataframe(response): '''Crude merging of list of dicts into pandas dataframe''' result_count = response.json()['count'] logger.info(f"{result_count} records") request_results = response.json()['results'] df = DataFrame.from_records(request_results) return df def get_newODK_form_as_csv(form_credentials: dict, jobPath: str, config: dict, status): '''Given a dict with a single kobotoolbox form to download from a kobotoolbox server, obtains it and converts to csv.''' output_dir = 'Export_newCSV' output_path = f"{jobPath}/{output_dir}/" Path(output_path).mkdir(parents=True, exist_ok=True) # get data from kobotoolbox server # keys are column names in the input dataframe # values that are None mean they should be dropped # values that are string simply rename the column # values that are functions should be run with that key and returns series/dataframe column_parser_dict = { '__version__' : 'None', '_attachments' : 'None', #'_bamboo_dataset_id' : 'None', '_geolocation' : 'None', # looks like a duplication of survey_infromation/location '_id' : 'None', '_notes' : 'None', '_status' : 'None', '_submission_time' : ('parse_date',(('name_out','SubmissionDate'),('fmt_in','%Y-%m-%dT%H:%M:%S'))), '_submitted_by' : 'None', '_tags' : 'None', '_uuid' : 'KEY', '_validation_status' : 'None', '_xform_id_string' : 'None', 'comment' : 'comment', 'dead_stemrust_samples' : 'SET-OF-dead_stemrust_samples', 'dead_stemrust_samples_count' : 'dead_stemrust_samples_count', 'dead_yellowrust_samples' : 'SET-OF-dead_yellowrust_samples', 'dead_yellowrust_samples_count' : 'dead_yellowrust_samples_count', 'deviceid' : 'deviceid', 'end' : ('parse_date',(('name_out','end'),('fmt_in','%Y-%m-%dT%H:%M:%S.%f%z'))), 'formhub/uuid' : 'None', 'imei' : 'imei', 'leaf_rust/leafrust_host_plant_reaction' : 'leaf_rust-leafrust_host_plant_reaction', 'leaf_rust/leafrust_incidence' : ('parse_cases',(('name_out','leaf_rust-leafrust_incidence'),('cases', cases_incident))), 'leaf_rust/leafrust_severity' : 'leaf_rust-leafrust_severity', 'live_leafrust_samples' : 'SET-OF-live_leafrust_samples', 'live_leafrust_samples_count' : 'live_leafrust_samples_count', 'live_stemrust_samples' : 'SET-OF-live_stemrust_samples', 'live_stemrust_samples_count' : 'live_stemrust_samples_count', 'live_yellowrust_samples' : 'SET-OF-live_yellowrust_samples', 'live_yellowrust_samples_count' : 'live_yellowrust_samples_count', 'meta/instanceID' : 'meta-instanceID', 'samples_collected' : 'samples_collected', 'samples_type' : 'samples_type', 'septoria/septoria_incidence' : 'septoria-septoria_incidence', 'septoria/septoria_severity' : 'septoria-septoria_severity', 'site_information/crop' : 'site_information-crop', 'site_information/field_area' : 'site_information-field_area', 'site_information/growth_stage' : 'site_information-growth_stage', 'site_information/survey_site' : 'site_information-survey_site', 'site_information/variety' : 'site_information-variety', 'start' : ('parse_date',(('name_out','start'),('fmt_in','%Y-%m-%dT%H:%M:%S.%f%z'))), 'stem_rust/Stemrust_severity' : 'stem_rust-Stemrust_severity', 'stem_rust/stemrust_host_plant_reaction' : 'stem_rust-stemrust_host_plant_reaction', 'stem_rust/stemrust_incidence' : ('parse_cases',(('name_out','stem_rust-stemrust_incidence'),('cases', cases_incident))), 'survey_infromation/location' : ('parse_location_kobotoolbox',()), 'survey_infromation/location_name' : 'survey_infromation-location_name', 'survey_infromation/survey_date' : ('parse_date',(('name_out','survey_infromation-survey_date'),('fmt_in','%Y-%m-%d'))), 'surveyor_infromation/country' : 'surveyor_infromation-country', 'surveyor_infromation/institution' : 'surveyor_infromation-institution', 'surveyor_infromation/surveyor_name' : 'surveyor_infromation-surveyor_name', 'today' : ('parse_date',(('name_out','today'),('fmt_in','%Y-%m-%d'))), 'username' : 'username', 'yellow_rust/yellowrust_host_plant_reaction' : 'yellow_rust-yellowrust_host_plant_reaction', 'yellow_rust/yellowrust_incidence' : ('parse_cases',(('name_out','leaf_rust-leafrust_incidence'),('cases', cases_incident))), 'yellow_rust/yellowrust_severity' : 'yellow_rust-yellowrust_severity', } logger.debug('Performing download') # perform a pull from the server, and if it fails write a warning message download_success = True skip_download: bool = config['Survey'].get('SkipServerDownload', False) if not skip_download: try: request = get_from_kobotoolbox(**form_credentials) except requests.exceptions.RequestException as e: status.reset('WARNING') download_success = False # define filenames csv_filename = f"SurveyData_{form_credentials['form_id']}.csv" csv_processed_filename = f"SurveyDataProcessed.csv" csv_processed_path = f"{output_path}/{csv_processed_filename}" if download_success and not skip_download: # parse dataframe dataframe_raw = build_dataframe(request) logger.debug('Saving raw csv file') df_raw_filename = f"{output_path}/{csv_filename}.csv" dataframe_raw.to_csv(df_raw_filename,index=False,quoting=csv.QUOTE_MINIMAL) # process to match ODK format dataframe_processed = parse_columns(dataframe_raw,column_parser_dict) logger.debug('Saving processed csv file') dataframe_processed.to_csv(csv_processed_path,index=False,quoting=csv.QUOTE_MINIMAL) if not download_success or skip_download: logger.info("Because server download failed somewhere (or we are skipping downloads), trying to recover by copying recent download") copy_success = False days_back = 1 acceptable_days_back = int(config['Survey']['AcceptableDowntimeDays']) logger.debug(f"Acceptable server downtime is set to {acceptable_days_back} days") while ((not copy_success) and (days_back <= acceptable_days_back)): current_date = datetime.datetime.strptime(config['StartString'],'%Y%m%d') past_date = current_date - datetime.timedelta(days=days_back) #past_jobPath = f"{config['WorkspacePathout']}{short_name[component]}_{past_date.strftime('%Y%m%d')}" past_jobPath = f"{config['WorkspacePath']}/SURVEYDATA_{past_date.strftime('%Y%m%d')}" past_output_path = f"{past_jobPath}/{output_dir}/" try: # check that python or perl coordinator script succeeded for that date success_py = os.path.isfile(f"{past_jobPath}/STATUS_SUCCESS") success_perl = os.path.isfile(f"{past_jobPath}/SURVEYDATA_SUCCESS.txt") assert success_py or success_perl past_csv_filename = csv_processed_filename logger.info(f"Looking for {past_output_path+past_csv_filename}") copyfile(past_output_path+past_csv_filename,csv_processed_path) assert os.path.isfile(csv_processed_path) copy_success = True except: logger.info(f"Not found a kobotoolbox download in {past_output_path}") days_back += 1 if not copy_success: logger.error(f"Failed get a suitable copy of survey data.") status.reset('ERROR') endJob(status,premature=True) logger.warning(f"Using download from {past_jobPath}.") return csv_processed_path