From 174ab5edf31f4f357cd9a5c89a3dba6069dad333 Mon Sep 17 00:00:00 2001
From: jws52 <jws52@cam.ac.uk>
Date: Wed, 31 Jan 2024 16:30:12 +0000
Subject: [PATCH] feat: South Asia ODK server format handled separately

This is moves field area calculations out of the source_gen package and into the server-specific load function.
---
 coordinator/ProcessorSurveyUtils.py  |   4 +-
 coordinator/ProcessorSurveys.py      |   2 +
 coordinator/ProcessorSurveysODKSA.py | 267 +++++++++++++++++++++++++++
 3 files changed, 271 insertions(+), 2 deletions(-)
 create mode 100644 coordinator/ProcessorSurveysODKSA.py

diff --git a/coordinator/ProcessorSurveyUtils.py b/coordinator/ProcessorSurveyUtils.py
index 100b432..dc3c39f 100644
--- a/coordinator/ProcessorSurveyUtils.py
+++ b/coordinator/ProcessorSurveyUtils.py
@@ -110,8 +110,8 @@ def parse_columns(df_in,coln_parser_dict):
     - values that are 'None' mean they should be dropped
     - values that are string simply rename the column
     - values that are tuples should be a runnable function with kwargs, where
-    the first item is the string identifier of the functionre and the rest is a
-    list of key,value pairs to be provided as kwargs, returns series/dataframe,
+    the first item is the string identifier of the function and the rest is a
+    list of key, value pairs to be provided as kwargs, returns series/dataframe,
     and drops key column.
     # TODO: is it neccesary to provide dtype conversion somewhere (e.g. dates)?'''
 
diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py
index 0a70a84..9b6d9e0 100644
--- a/coordinator/ProcessorSurveys.py
+++ b/coordinator/ProcessorSurveys.py
@@ -33,6 +33,7 @@ from Processor import Processor
 from source_gen.clustering import run_case
 
 from ProcessorSurveysODK import get_ODK_form_as_csv
+from ProcessorSurveysODKSA import get_ODK_SA_form_as_csv
 from ProcessorSurveysODKv2 import get_ODKv2_form_as_csv
 from ProcessorSurveysnewODK import get_newODK_form_as_csv
 from ProcessorSurveysnewODK2 import get_newODK2_form_as_csv
@@ -67,6 +68,7 @@ class ProcessorSurveys(Processor):
 
         self.GET_FORM_AS_CSV_DICT = {
             'ODK':         get_ODK_form_as_csv,
+            'ODK_SA':      get_ODK_SA_form_as_csv,
             'kobotoolbox': get_kobotoolbox_form_as_csv,
             'WRSIS':       get_WRSIS_form_as_csv,
             'WRT':         get_WRT_form_as_csv,
diff --git a/coordinator/ProcessorSurveysODKSA.py b/coordinator/ProcessorSurveysODKSA.py
new file mode 100644
index 0000000..4ba4784
--- /dev/null
+++ b/coordinator/ProcessorSurveysODKSA.py
@@ -0,0 +1,267 @@
+#ProcessorSurveysODK.py
+"""Functions for parsing wheat rust survey records from ODK with South Asia
+formatting."""
+
+import datetime
+import logging
+import os
+from csv import QUOTE_MINIMAL
+from pathlib import Path
+from string import Template
+import subprocess
+
+from pandas import read_csv
+from shutil import copyfile
+
+from ProcessorSurveyUtils import parse_columns
+from ProcessorUtils import (
+        subprocess_and_log,
+        endJob,
+        add_filters_to_sublogger,
+)
+
+logger = logging.getLogger('Processor.Surveys.ODK')
+add_filters_to_sublogger(logger)
+
+# keys are column names in the input dataframe
+# values that are None mean they should be dropped
+# values that are string simply rename the column
+# values that are functions should be run with that key and returns series/dataframe
+COLUMN_PARSER_DICT = {
+        'SubmissionDate' : 'SubmissionDate',
+        'start' : 'start',
+        'end' : 'end',
+        'today' : 'today',
+        'deviceid' : 'deviceid',
+        'subscriberid' : 'subscriberid',
+        'imei' : 'imei',
+        'phonenumber' : 'phonenumber',
+        'username' : 'username',
+        'country_list' : 'surveyor_infromation-country',
+        'blast_rust' : 'None',
+        'surveyor_name' : 'surveyor_infromation-surveyor_name',
+        'institution' : 'surveyor_infromation-institution',
+        'mobile_num' : 'None',
+        'site_information-survey_site' : 'site_information-survey_site',
+        'site_information-crop' : 'site_information-crop',
+        'site_information-field_area' : 'site_information-field_area', # requires conversion later
+        'site_information-unit_m2' : 'site_information-unit_m2',  # requires conversion later
+        'site_information-field_size' : 'site_information-field_size',  # requires conversion later
+        'site_information-variety' : 'site_information-variety',
+        'site_information-growth_stage' : 'site_information-growth_stage',
+        'other_crop' : 'None',
+        'survey_infromation-location_name' : 'survey_infromation-location_name',
+        'survey_infromation-location_blast' : 'survey_infromation-location_blast',
+        'survey_infromation-sampColor' : 'survey_infromation-sampColor',
+        'survey_infromation-dateRange' : 'survey_infromation-dateRange',
+        'survey_infromation-fieldNumber' : 'survey_infromation-fieldNumber',
+        'survey_infromation-diseaseIncidencePercentage' : 'survey_infromation-diseaseIncidencePercentage',
+        'survey_infromation-severityPercentage' : 'survey_infromation-severityPercentage',
+        'survey_infromation-survey_date' : 'survey_infromation-survey_date',
+        'survey_infromation-site_name' : 'survey_infromation-location_name',
+        'survey_infromation-location-Latitude' : 'survey_infromation-location-Latitude',
+        'survey_infromation-location-Longitude' : 'survey_infromation-location-Longitude',
+        'survey_infromation-location-Altitude' : 'survey_infromation-location-Altitude',
+        'survey_infromation-location-Accuracy' : 'survey_infromation-location-Accuracy',
+        'stem_rust-stemrust_incidence' : 'stem_rust-stemrust_incidence',
+        'stem_rust-Stemrust_severity' : 'stem_rust-Stemrust_severity',
+        'stem_rust-stemrust_host_plant_reaction' : 'stem_rust-stemrust_host_plant_reaction',
+        'leaf_rust-leafrust_incidence' : 'leaf_rust-leafrust_incidence',
+        'leaf_rust-leafrust_severity' : 'leaf_rust-leafrust_severity',
+        'leaf_rust-leafrust_host_plant_reaction' : 'leaf_rust-leafrust_host_plant_reaction',
+        'yellow_rust-yellowrust_incidence' : 'yellow_rust-yellowrust_incidence',
+        'yellow_rust-yellowrust_severity' : 'yellow_rust-yellowrust_severity',
+        'yellow_rust-yellowrust_host_plant_reaction' : 'yellow_rust-yellowrust_host_plant_reaction',
+        'septoria-septoria_incidence' : 'septoria-septoria_incidence',
+        'septoria-septoria_severity' : 'septoria-septoria_severity',
+        'other_diseases_group-other_diseases' : 'other_diseases_group-other_diseases',
+        'score_diseases_count' : 'score_diseases_count',
+        'SET-OF-score_diseases' : 'SET-OF-score_diseases',
+        'samples_collected' : 'samples_collected',
+        'samples_type' : 'samples_type',
+        'sample_size-number_stemrust_live' : 'sample_size-number_stemrust_live',
+        'sample_size-number_stemrust_dead_dna' : 'sample_size-number_stemrust_dead_dna',
+        'sample_size-number_yellowrust_live' : 'sample_size-number_yellowrust_live',
+        'sample_size-number_yellowrust_dead' : 'sample_size-number_yellowrust_dead',
+        'sample_size-number_leafrust_live' : 'sample_size-number_leafrust_live',
+        'sample_size-using_barcode' : 'sample_size-using_barcode',
+        'live_stemrust_samples_count' : 'live_stemrust_samples_count',
+        'SET-OF-live_stemrust_samples' : 'SET-OF-live_stemrust_samples',
+        'dead_stemrust_samples_count' : 'dead_stemrust_samples_count',
+        'SET-OF-dead_stemrust_samples' : 'SET-OF-dead_stemrust_samples',
+        'live_yellowrust_samples_count' : 'live_yellowrust_samples_count',
+        'SET-OF-live_yellowrust_samples' : 'SET-OF-live_yellowrust_samples',
+        'dead_yellowrust_samples_count' : 'dead_yellowrust_samples_count',
+        'SET-OF-dead_yellowrust_samples' : 'SET-OF-dead_yellowrust_samples',
+        'live_leafrust_samples_count' : 'live_leafrust_samples_count',
+        'SET-OF-live_leafrust_samples' : 'SET-OF-live_leafrust_samples',
+        'comment' : 'comment',
+        'meta-instanceID' : 'meta-instanceID',
+        'meta-instanceName' : 'None',
+        'KEY' : 'KEY',
+        }
+
+
+
+
+COLUMN_PARSER_DICT = {
+
+        'site_information-field_area' : 'site_information-field_area',
+
+        }
+
+def get_ODK_SA_form_as_csv(form_credentials: dict, jobPath: str, config: dict, status):
+    '''Given a dict with a single ODK form to download from an ODK Aggregate
+    server with South Asia formatting, obtains it and converts to csv.'''
+
+    ODK_output_path_template = config['Survey'].get('ODKDatabasePathTemplate','${WorkspacePathout}/ODK_DB/')
+    ODK_output_path = Template(ODK_output_path_template).substitute(**config)
+
+    # get data from ODK server
+    description_short = 'ODK_SA download'
+    description_long = 'survey download from ODK_SA server'
+
+    # get path to ODK executable
+    ODK_jar = form_credentials['ODK_jar']
+    assert os.path.exists(ODK_jar)
+
+    skip_download: bool = config['Survey'].get('SkipServerDownload', False)
+
+    ODK_download_success = True
+
+    if not skip_download:
+        try:
+            ODK_download = ['java',
+                            '-jar', ODK_jar,
+                            '--pull_aggregate',
+                            '--form_id', form_credentials['form_id'],
+                            '--storage_directory', ODK_output_path,
+                            '--odk_url', form_credentials['url'],
+                            '--odk_username', form_credentials['user'],
+                            '--odk_password', form_credentials['pass']]
+
+            logger.debug('Performing ' + description_long)
+
+            # perform a pull from the ODK server, and if it fails write a warning message
+
+            subprocess_and_log(ODK_download,description_short,description_long,log_type='warning',check=True)
+
+        except subprocess.CalledProcessError as e:
+            status.reset('WARNING')
+            ODK_download_success = False
+
+    #TODO: Check it came down cleanly ($serverOutputDir is created whether cleanly or not, so test more explicitly):
+
+    ODK_csv_path = f"{jobPath}/ExportCSV/"
+
+    Path(ODK_csv_path).mkdir(parents = True, exist_ok = True)
+
+    ODK_raw_csv_filename = f"SurveyData_{form_credentials['form_id']}.csv"
+
+    ODK_proc_csv_filename = f"SurveyData_{form_credentials['form_id']}_proc.csv"
+
+    if ODK_download_success and not skip_download:
+
+        description_short = 'ODK export'
+        description_long = 'converting ODK download to csv'
+        logger.debug(description_long)
+
+        ODK_java_to_csv = ['java',
+                '-jar', ODK_jar,
+                '--export',
+                '--form_id', form_credentials['form_id'],
+                '--storage_directory',ODK_output_path,
+                '--export_directory',ODK_csv_path,
+                '--export_filename',ODK_raw_csv_filename]
+
+        logger.debug('Performing ' + description_long)
+
+        try:
+            subprocess_and_log(ODK_java_to_csv,description_short,description_long,check=True)
+
+        except subprocess.CalledProcessError as e:
+            status.reset('WARNING')
+            ODK_download_success = False
+
+        # read the raw ODK_SA survey csv
+        dataframe_raw = read_csv(ODK_csv_path+ODK_raw_csv_filename)
+
+        # process to match ODK format
+
+        dataframe_processed = parse_columns(dataframe_raw,COLUMN_PARSER_DICT)
+
+        # parse area information
+        area_coln = 'site_information-field_area'
+        unit_m2_coln = 'site_information-unit_m2'
+        field_size_coln = 'site_information-field_size'
+
+        # this column should all be strings describing the local unit
+        area = dataframe_processed[area_coln]
+
+        field_size = dataframe_processed[field_size_coln] # number of local units surveyed
+        area_unit_m2 = dataframe_processed[unit_m2_coln] # area of local unit in m2
+
+        ha_per_m2 = 1.e-4 # hectare in units of m2
+
+        area_in_ha = field_size * area_unit_m2 * ha_per_m2
+
+        dataframe_processed[area_coln] = area_in_ha
+        del dataframe_processed[unit_m2_coln]
+        del dataframe_processed[field_size_coln]
+
+        logger.debug('Saving processed csv file')
+
+        dataframe_processed.to_csv(ODK_csv_path+ODK_proc_csv_filename,index=False,quoting=QUOTE_MINIMAL)
+
+    if not ODK_download_success or skip_download:
+
+        logger.info("Because ODK server download failed somewhere (or we are skipping downloads), trying to recover by copying recent download")
+
+        ODK_copy_success = False
+
+        days_back = 1
+        acceptable_days_back = int(config['Survey']['AcceptableDowntimeDays'])
+        logger.debug(f"Acceptable server downtime is set to {acceptable_days_back} days")
+
+        while ((not ODK_copy_success) and (days_back <= acceptable_days_back)):
+            current_date = datetime.datetime.strptime(config['StartString'],'%Y%m%d')
+
+            past_date = current_date - datetime.timedelta(days=days_back)
+
+            #past_jobPath = f"{config['WorkspacePathout']}{short_name[component]}_{past_date.strftime('%Y%m%d')}"
+            past_jobPath = f"{config['WorkspacePath']}/SURVEYDATA_{past_date.strftime('%Y%m%d')}"
+
+            past_ODK_csv_path = f"{past_jobPath}/ExportCSV/"
+
+            try:
+                # check that python or perl coordinator script succeeded for that date
+                success_py = os.path.isfile(f"{past_jobPath}/STATUS_SUCCESS")
+                success_with_warning_py = os.path.isfile(f"{past_jobPath}/STATUS_SUCCESS_WITH_WARNING")
+                success_perl = os.path.isfile(f"{past_jobPath}/SURVEYDATA_SUCCESS.txt")
+                assert success_py or success_with_warning_py or success_perl
+
+                #logger.warning(f"Temporary rename of expected previous download, for jobs before ~Apr 2021")
+                #past_ODK_csv_filename = f"SurveyData.csv"
+                past_ODK_csv_filename = ODK_proc_csv_filename
+
+                logger.info(f"Looking for {past_ODK_csv_path+past_ODK_csv_filename}")
+                print(f"Looking for {past_ODK_csv_path+past_ODK_csv_filename}")
+
+                copyfile(past_ODK_csv_path+past_ODK_csv_filename,ODK_csv_path+ODK_proc_csv_filename)
+
+                assert os.path.isfile(ODK_csv_path+ODK_proc_csv_filename)
+
+                ODK_copy_success = True
+            except:
+                logger.info(f"Not found an ODK download in {past_ODK_csv_path}")
+
+            days_back += 1
+
+        if not ODK_copy_success:
+            logger.error(f"Failed get a suitable copy of survey data.")
+            status.reset('ERROR')
+            endJob(status,premature=True)
+
+        logger.warning(f"Using ODK download from {past_jobPath}.")
+
+    return ODK_csv_path+ODK_proc_csv_filename
-- 
GitLab