FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit 6b82cfba authored by J.W. Smith's avatar J.W. Smith
Browse files

feat: Handle multiple ODK forms

Also, now:
* using python to incorporate the manual survey removals and additions.
* Attempting to grab a copy of the cluster calcs processed surveydata file.
parent cebca69b
No related branches found
No related tags found
No related merge requests found
{
"forms" : [
{
"server" : "https://csisaodk.appspot.com/",
"user" : "username",
"pass" : "passwd",
"form_id" : "wheat_rust_blast_survey_1_1",
"form_name" : "Wheat rust and blast survey 1_1"
},
{
"server" : "https://csisaodk.appspot.com/",
"user" : "username",
"pass" : "passwd",
"form_id" : "wheat_rust_survey_Afghanistan",
"form_name" : "Wheat rust survey for Afghanistan"
}
]
}
......@@ -8,12 +8,17 @@ import logging
import datetime
import subprocess
import json
import re
import csv
from glob import glob
from shutil import copyfile
from pathlib import Path
from string import Template
from distutils.dir_util import copy_tree
from numpy import all as np_all
from pandas import read_csv, concat
import NAMEPreProcessor as npp # script in EWS-Coordinator project
import EnvSuitPipeline as esp # script in EWS-Coordinator project
......@@ -220,7 +225,13 @@ def get_ODK_form_as_csv(form_credentials: dict, jobPath: str, config: dict, stat
success_perl = os.path.isfile(f"{past_jobPath}/SURVEYDATA_SUCCESS.txt")
assert success_py or success_perl
copyfile(past_ODK_csv_path+ODK_csv_filename,ODK_csv_path+ODK_csv_filename)
#logger.warning(f"Temporary rename of expected previous download, for jobs before ~Apr 2021")
#past_ODK_csv_filename = f"SurveyData.csv"
past_ODK_csv_filename = ODK_csv_filename
logger.info(f"Looking for {past_ODK_csv_path+past_ODK_csv_filename}")
copyfile(past_ODK_csv_path+past_ODK_csv_filename,ODK_csv_path+ODK_csv_filename)
assert os.path.isfile(ODK_csv_path+ODK_csv_filename)
......@@ -237,8 +248,8 @@ def get_ODK_form_as_csv(form_credentials: dict, jobPath: str, config: dict, stat
logger.warning(f"Using ODK download from {past_jobPath}.")
return
return ODK_csv_path+ODK_csv_filename
def process_in_job_survey(jobPath,status,config,component):
logger.info('started process_in_job_survey()')
......@@ -251,22 +262,89 @@ def process_in_job_survey(jobPath,status,config,component):
assert 'forms' in cred.keys()
ODK_csv_filenames = []
ODK_csv_filenames = {}
for form in cred['forms']:
logger.debug(f"Starting to download {form['form_id']}")
ODK_csv_filename = get_ODK_form_as_csv(form, jobPath, config, status)
ODK_csv_filenames += [ODK_csv_filename]
ODK_csv_filenames[form['form_id']] = ODK_csv_filename
# load each file of surveys as a dataframe
forms = {}
for form_name,form_fn in ODK_csv_filenames.items():
# some define column types, hardwired for now
col_types = {'comment':'str'}
form_df = read_csv(form_fn,dtype=col_types)
forms[form_name] = form_df
# create some standard dataframe modification functions
def add_column(df,coln,value):
df[coln]=value
return
def remove_column(df,coln,value):
del df[coln]
return
def replace_column(df,coln,value):
df[coln]=value
return
raise Exception
func_types = {
'add': add_column,
'remove' : remove_column,
'replace' : replace_column
}
# TODO: Align formatting of different SurveyData files
# The differences between the Afghanistan form and the BangNep form are:
# - Afghanistan lacks 'subscriberid'. So provide a blank value.
# simple format alignment using edits on config
# (should this need to be much more sophisticated, reconsider the workflow)
if 'FormEdits' in config['Survey']:
# TODO: Merge additional SurveyData files
form_edits = config['Survey']['FormEdits']
# loop over each form
for form_name, edits in form_edits.items():
form_df = forms[form_name]
# loop over each type of edit
for func_type, columns in edits.items():
# check the function is available
assert func_type in func_types
# loop over each column to modify
for coln,val in columns.items():
# apply the edit
func_types[func_type](form_df,coln,val)
# Merge additional SurveyData files and rearrange columns to be consistent
# Assumes that the same columns are present in all forms
# and that the first form is the standard
first=True
for dfi in forms.values():
if first:
standard_columns = dfi.columns.tolist()
dfm = dfi
first=False
continue
dfi = dfi[standard_columns]
dfm = concat([dfm,dfi],axis='rows')
# save the result
ODK_csv_path = f"{jobPath}/ExportCSV/"
forms_fn = f"{ODK_csv_path}/Merged_SurveyData.csv"
dfm.to_csv(forms_fn,index=False,quoting=csv.QUOTE_MINIMAL)
logger.debug(f"Preparing to apply removals and additions to ODK survey data")
......@@ -275,27 +353,43 @@ def process_in_job_survey(jobPath,status,config,component):
survey_errors_to_remove_filepath = f"{config['WorkspacePath']}/SURVEYDATA_MANUAL/SurveyDataErrorsToRemove.csv"
survey_additions_filepath = f"{config['WorkspacePath']}/SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv"
RPath = '/usr/local/R/bin/Rscript'
# perform here in python, using the 'KEY' column
# check the key column is unique
R_process_surveys = [RPath,
'--no-init-file',
coordinator_ppath+'/SurveyDataPreprocessor.R',
f"{ODK_csv_path}/{ODK_csv_filename}",
survey_errors_to_remove_filepath,
survey_additions_filepath,
processed_surveys_filepath]
assert dfm['KEY'].unique().size == dfm['KEY'].size, 'KEY column is not unique'
logger.debug(f"Applying removals and additions to {ODK_csv_filename}")
df_rm = read_csv(survey_errors_to_remove_filepath,dtype='str')
keys_to_rm = df_rm['KEY']
description_short = 'survey preprocessor'
description_long = 'process surveys with removals and additions'
# check that all of the keys to remove exist in the original data
rm_keys_found = df_rm['KEY'].apply(lambda cell: cell in dfm['KEY'].values)
n_rm_keys_found = rm_keys_found.sum()
n_rm_keys = rm_keys_found.size
if not np_all(rm_keys_found):
# this might happen if the run date is in the past
logger.warning(f"Only found {n_rm_keys_found} of {n_rm_keys} survey errors to remove")
try:
subprocess_and_log(R_process_surveys, description_short, description_long)
except:
status.reset('ERROR')
endJob(status,premature=True)
rm_keys_not_found = df_rm[~rm_keys_found]
logger.debug(f"Erroneous entries not found are:\n{rm_keys_not_found}")
# identify which surveys to remove
idx_to_rm = dfm['KEY'].apply(lambda cell: cell in keys_to_rm.values)
#drop them in-place
dfm = dfm[~idx_to_rm]
logger.info(f"Removed {n_rm_keys_found} erroneous surveys")
# add the extra entries
df_add = read_csv(survey_additions_filepath,dtype='str')
n_add_keys = df_add.shape[0]
df_join = concat([dfm,df_add])
assert dfm.shape[0]+df_add.shape[0] == df_join.shape[0], 'Unexpected result of including additional surveys'
logger.info(f"Added {n_add_keys} additional surveys")
# save as processed
df_join.to_csv(processed_surveys_filepath,index=False,quoting=csv.QUOTE_MINIMAL)
logger.debug('Preparing clustering calculation')
date = datetime.datetime.now()
......@@ -312,14 +406,17 @@ def process_in_job_survey(jobPath,status,config,component):
Path(path).unlink()
# prepare environment for clustering calc
RPath = '/usr/local/R/bin/Rscript'
clustering_script = f"{cluster_calc_path}/code/R/clustering.R"
clustering_env = {
**os.environ,
'R_LIBS':'/home/ewsmanager/R-packages-EWS-clustering/x86_64-pc-linux-gnu-library/3.5',
'PROJ_LIB' : '/usr/share/proj/', # conda env breaks the automatic assignment of PROJ_LIB
}
clustering_script = f"{cluster_calc_path}/code/R/clustering.R"
clustering_calc = [RPath,
'--no-init-file',
clustering_script,
......@@ -341,6 +438,35 @@ def process_in_job_survey(jobPath,status,config,component):
logger.debug('Checking output of clustering calculation')
output_directory = f"{jobPath}/SURVEYDATA_{config['StartString']}_0000"
Path(output_directory).mkdir(parents=True, exist_ok=True)
try:
logger.debug('Trying to copy the dataset processed for clustering')
clustering_proc_path_glob = f"{cluster_calc_path}/output/survey_data_processed_{date.strftime('%Y-%m-%d')}_*.csv"
clustering_proc_path_list = glob(clustering_proc_path_glob)
if len(clustering_proc_path_list) == 0:
logger.debug(f"No processed files produced from clustering in {clustering_proc_path_glob}")
raise Exception
elif len(clustering_proc_path_list) > 1:
logger.debug(f"Multiple processed files produced from clustering in {clustering_proc_path_glob}")
raise Exception
else:
logger.debug('Found 1 processed file, placing copy of result in job directory')
proc_filename = f"survey_data_processed_{config['StartString']}.csv"
proc_path = f"{output_directory}/{proc_filename}"
logger.debug(f"as {proc_path}")
copyfile(clustering_proc_path_list[0], proc_path)
except:
logger.debug('Failed to get a copy of the dataset processed for clustering')
clustering_output_path_glob = f"{cluster_calc_path}/output/sources_{date.strftime('%Y-%m-%d')}_*.csv"
clustering_output_path_list = glob(clustering_output_path_glob)
if len(clustering_output_path_list) == 0:
......@@ -354,9 +480,6 @@ def process_in_job_survey(jobPath,status,config,component):
logger.debug('Placing copy of result in job directory')
output_directory = f"{jobPath}/SURVEYDATA_{config['StartString']}_0000"
Path(output_directory).mkdir(parents=True, exist_ok=True)
output_filename = f"sources_{config['StartString']}.csv"
output_path = f"{output_directory}/{output_filename}"
......
options(stringsAsFactors = FALSE)
library(dplyr)
args = commandArgs(TRUE)
originalDataFileName = args[1]
print(paste0("ORIGINAL DATA: ", originalDataFileName))
deleteRowsFileName = args[2]
print(paste0("DELETE DATA: ", deleteRowsFileName))
addRowsFileName = args[3]
print(paste0("ADD DATA: ", addRowsFileName))
outputFileName = args[4]
print(paste0("OUTPUT DATA: ", outputFileName))
#colClasses must be used otherwise R gets the inference wrong for the deleteData table,
#which has much less data for it to work with, and therefore dplyr can't match between tables
#check.names is essential, as otherwise the characters in the column names are arbitrarily butchered and no longer match what downstream elements require
originalData = read.csv(originalDataFileName, header = TRUE, check.names=FALSE, colClasses = "character")
print(paste0("Original rows: ", nrow(originalData)))
#Remove unwanted rows:
deleteData = read.csv(deleteRowsFileName, header = TRUE, check.names=FALSE, colClasses = "character")
print(paste0("Deletion rows: ", nrow(deleteData)))
cleanData = anti_join(originalData, deleteData)
print(paste0("Clean rows: ", nrow(cleanData)))
#Check and see if each row from deleteData was removed from originalData - warn if rows not removed
nRowNotDeleted = nrow(cleanData) + nrow(deleteData) - nrow(originalData)
if(nRowNotDeleted != 0) {
print(paste0("WARNING: Did not find a match for ", nRowNotDeleted, " rows in the deletion table"))
}
#Add extra rows:
extraData = read.csv(addRowsFileName, header = TRUE, check.names=FALSE, colClasses = "character")
print(paste0("Extra rows: ", nrow(extraData)))
joinedData = rbind(cleanData, extraData)
print(paste0("Joined rows: ", nrow(joinedData)))
#Write output:
write.csv(joinedData, file = outputFileName, row.names = FALSE, quote = TRUE)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment