diff --git a/Cred-ODK-SouthAsia-example.json b/Cred-ODK-SouthAsia-example.json new file mode 100644 index 0000000000000000000000000000000000000000..5c54ca9bedec30bc1607c75eed02b32fee81f94f --- /dev/null +++ b/Cred-ODK-SouthAsia-example.json @@ -0,0 +1,18 @@ +{ + "forms" : [ + { + "server" : "https://csisaodk.appspot.com/", + "user" : "username", + "pass" : "passwd", + "form_id" : "wheat_rust_blast_survey_1_1", + "form_name" : "Wheat rust and blast survey 1_1" + }, + { + "server" : "https://csisaodk.appspot.com/", + "user" : "username", + "pass" : "passwd", + "form_id" : "wheat_rust_survey_Afghanistan", + "form_name" : "Wheat rust survey for Afghanistan" + } + ] +} diff --git a/ProcessorComponents.py b/ProcessorComponents.py index 8d30deb5b16dcfd2b81b3fe4b95bd3603f40491a..6ccfeed25a0c2f89ed37a348358863b21215a43c 100644 --- a/ProcessorComponents.py +++ b/ProcessorComponents.py @@ -8,12 +8,17 @@ import logging import datetime import subprocess import json +import re +import csv from glob import glob from shutil import copyfile from pathlib import Path from string import Template from distutils.dir_util import copy_tree +from numpy import all as np_all +from pandas import read_csv, concat + import NAMEPreProcessor as npp # script in EWS-Coordinator project import EnvSuitPipeline as esp # script in EWS-Coordinator project @@ -220,7 +225,13 @@ def get_ODK_form_as_csv(form_credentials: dict, jobPath: str, config: dict, stat success_perl = os.path.isfile(f"{past_jobPath}/SURVEYDATA_SUCCESS.txt") assert success_py or success_perl - copyfile(past_ODK_csv_path+ODK_csv_filename,ODK_csv_path+ODK_csv_filename) + #logger.warning(f"Temporary rename of expected previous download, for jobs before ~Apr 2021") + #past_ODK_csv_filename = f"SurveyData.csv" + past_ODK_csv_filename = ODK_csv_filename + + logger.info(f"Looking for {past_ODK_csv_path+past_ODK_csv_filename}") + + copyfile(past_ODK_csv_path+past_ODK_csv_filename,ODK_csv_path+ODK_csv_filename) assert os.path.isfile(ODK_csv_path+ODK_csv_filename) @@ -237,8 +248,8 @@ def get_ODK_form_as_csv(form_credentials: dict, jobPath: str, config: dict, stat logger.warning(f"Using ODK download from {past_jobPath}.") - return - + return ODK_csv_path+ODK_csv_filename + def process_in_job_survey(jobPath,status,config,component): logger.info('started process_in_job_survey()') @@ -251,22 +262,89 @@ def process_in_job_survey(jobPath,status,config,component): assert 'forms' in cred.keys() - ODK_csv_filenames = [] + ODK_csv_filenames = {} for form in cred['forms']: logger.debug(f"Starting to download {form['form_id']}") ODK_csv_filename = get_ODK_form_as_csv(form, jobPath, config, status) - ODK_csv_filenames += [ODK_csv_filename] + ODK_csv_filenames[form['form_id']] = ODK_csv_filename + + # load each file of surveys as a dataframe + forms = {} + for form_name,form_fn in ODK_csv_filenames.items(): + + # some define column types, hardwired for now + col_types = {'comment':'str'} + + form_df = read_csv(form_fn,dtype=col_types) + + forms[form_name] = form_df + + # create some standard dataframe modification functions + def add_column(df,coln,value): + df[coln]=value + return + + def remove_column(df,coln,value): + del df[coln] + return + + def replace_column(df,coln,value): + df[coln]=value + return - raise Exception + func_types = { + 'add': add_column, + 'remove' : remove_column, + 'replace' : replace_column + } - # TODO: Align formatting of different SurveyData files - # The differences between the Afghanistan form and the BangNep form are: - # - Afghanistan lacks 'subscriberid'. So provide a blank value. + # simple format alignment using edits on config + # (should this need to be much more sophisticated, reconsider the workflow) + if 'FormEdits' in config['Survey']: - # TODO: Merge additional SurveyData files + form_edits = config['Survey']['FormEdits'] + + # loop over each form + for form_name, edits in form_edits.items(): + + form_df = forms[form_name] + + # loop over each type of edit + for func_type, columns in edits.items(): + + # check the function is available + assert func_type in func_types + + # loop over each column to modify + for coln,val in columns.items(): + + # apply the edit + func_types[func_type](form_df,coln,val) + + # Merge additional SurveyData files and rearrange columns to be consistent + # Assumes that the same columns are present in all forms + # and that the first form is the standard + + first=True + for dfi in forms.values(): + + if first: + standard_columns = dfi.columns.tolist() + dfm = dfi + first=False + continue + + dfi = dfi[standard_columns] + + dfm = concat([dfm,dfi],axis='rows') + + # save the result + ODK_csv_path = f"{jobPath}/ExportCSV/" + forms_fn = f"{ODK_csv_path}/Merged_SurveyData.csv" + dfm.to_csv(forms_fn,index=False,quoting=csv.QUOTE_MINIMAL) logger.debug(f"Preparing to apply removals and additions to ODK survey data") @@ -275,27 +353,43 @@ def process_in_job_survey(jobPath,status,config,component): survey_errors_to_remove_filepath = f"{config['WorkspacePath']}/SURVEYDATA_MANUAL/SurveyDataErrorsToRemove.csv" survey_additions_filepath = f"{config['WorkspacePath']}/SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv" - RPath = '/usr/local/R/bin/Rscript' + # perform here in python, using the 'KEY' column + # check the key column is unique - R_process_surveys = [RPath, - '--no-init-file', - coordinator_ppath+'/SurveyDataPreprocessor.R', - f"{ODK_csv_path}/{ODK_csv_filename}", - survey_errors_to_remove_filepath, - survey_additions_filepath, - processed_surveys_filepath] + assert dfm['KEY'].unique().size == dfm['KEY'].size, 'KEY column is not unique' - logger.debug(f"Applying removals and additions to {ODK_csv_filename}") + df_rm = read_csv(survey_errors_to_remove_filepath,dtype='str') + keys_to_rm = df_rm['KEY'] - description_short = 'survey preprocessor' - description_long = 'process surveys with removals and additions' + # check that all of the keys to remove exist in the original data + rm_keys_found = df_rm['KEY'].apply(lambda cell: cell in dfm['KEY'].values) + n_rm_keys_found = rm_keys_found.sum() + n_rm_keys = rm_keys_found.size + if not np_all(rm_keys_found): + # this might happen if the run date is in the past + logger.warning(f"Only found {n_rm_keys_found} of {n_rm_keys} survey errors to remove") - try: - subprocess_and_log(R_process_surveys, description_short, description_long) - except: - status.reset('ERROR') - endJob(status,premature=True) + rm_keys_not_found = df_rm[~rm_keys_found] + logger.debug(f"Erroneous entries not found are:\n{rm_keys_not_found}") + + # identify which surveys to remove + idx_to_rm = dfm['KEY'].apply(lambda cell: cell in keys_to_rm.values) + #drop them in-place + dfm = dfm[~idx_to_rm] + logger.info(f"Removed {n_rm_keys_found} erroneous surveys") + + # add the extra entries + df_add = read_csv(survey_additions_filepath,dtype='str') + n_add_keys = df_add.shape[0] + df_join = concat([dfm,df_add]) + assert dfm.shape[0]+df_add.shape[0] == df_join.shape[0], 'Unexpected result of including additional surveys' + + logger.info(f"Added {n_add_keys} additional surveys") + + # save as processed + df_join.to_csv(processed_surveys_filepath,index=False,quoting=csv.QUOTE_MINIMAL) + logger.debug('Preparing clustering calculation') date = datetime.datetime.now() @@ -312,14 +406,17 @@ def process_in_job_survey(jobPath,status,config,component): Path(path).unlink() # prepare environment for clustering calc + + RPath = '/usr/local/R/bin/Rscript' + + clustering_script = f"{cluster_calc_path}/code/R/clustering.R" + clustering_env = { **os.environ, 'R_LIBS':'/home/ewsmanager/R-packages-EWS-clustering/x86_64-pc-linux-gnu-library/3.5', 'PROJ_LIB' : '/usr/share/proj/', # conda env breaks the automatic assignment of PROJ_LIB } - clustering_script = f"{cluster_calc_path}/code/R/clustering.R" - clustering_calc = [RPath, '--no-init-file', clustering_script, @@ -341,6 +438,35 @@ def process_in_job_survey(jobPath,status,config,component): logger.debug('Checking output of clustering calculation') + output_directory = f"{jobPath}/SURVEYDATA_{config['StartString']}_0000" + Path(output_directory).mkdir(parents=True, exist_ok=True) + + try: + logger.debug('Trying to copy the dataset processed for clustering') + + clustering_proc_path_glob = f"{cluster_calc_path}/output/survey_data_processed_{date.strftime('%Y-%m-%d')}_*.csv" + clustering_proc_path_list = glob(clustering_proc_path_glob) + if len(clustering_proc_path_list) == 0: + logger.debug(f"No processed files produced from clustering in {clustering_proc_path_glob}") + raise Exception + + elif len(clustering_proc_path_list) > 1: + logger.debug(f"Multiple processed files produced from clustering in {clustering_proc_path_glob}") + raise Exception + + else: + logger.debug('Found 1 processed file, placing copy of result in job directory') + + proc_filename = f"survey_data_processed_{config['StartString']}.csv" + proc_path = f"{output_directory}/{proc_filename}" + + logger.debug(f"as {proc_path}") + + copyfile(clustering_proc_path_list[0], proc_path) + + except: + logger.debug('Failed to get a copy of the dataset processed for clustering') + clustering_output_path_glob = f"{cluster_calc_path}/output/sources_{date.strftime('%Y-%m-%d')}_*.csv" clustering_output_path_list = glob(clustering_output_path_glob) if len(clustering_output_path_list) == 0: @@ -354,9 +480,6 @@ def process_in_job_survey(jobPath,status,config,component): logger.debug('Placing copy of result in job directory') - output_directory = f"{jobPath}/SURVEYDATA_{config['StartString']}_0000" - Path(output_directory).mkdir(parents=True, exist_ok=True) - output_filename = f"sources_{config['StartString']}.csv" output_path = f"{output_directory}/{output_filename}" diff --git a/SurveyDataPreprocessor.R b/SurveyDataPreprocessor.R deleted file mode 100644 index b6aa73b45d6d07322b7315b16d4be89cdf31b397..0000000000000000000000000000000000000000 --- a/SurveyDataPreprocessor.R +++ /dev/null @@ -1,57 +0,0 @@ - -options(stringsAsFactors = FALSE) - -library(dplyr) - -args = commandArgs(TRUE) - -originalDataFileName = args[1] -print(paste0("ORIGINAL DATA: ", originalDataFileName)) - -deleteRowsFileName = args[2] -print(paste0("DELETE DATA: ", deleteRowsFileName)) - -addRowsFileName = args[3] -print(paste0("ADD DATA: ", addRowsFileName)) - -outputFileName = args[4] -print(paste0("OUTPUT DATA: ", outputFileName)) - - -#colClasses must be used otherwise R gets the inference wrong for the deleteData table, -#which has much less data for it to work with, and therefore dplyr can't match between tables -#check.names is essential, as otherwise the characters in the column names are arbitrarily butchered and no longer match what downstream elements require -originalData = read.csv(originalDataFileName, header = TRUE, check.names=FALSE, colClasses = "character") - -print(paste0("Original rows: ", nrow(originalData))) - -#Remove unwanted rows: -deleteData = read.csv(deleteRowsFileName, header = TRUE, check.names=FALSE, colClasses = "character") - -print(paste0("Deletion rows: ", nrow(deleteData))) - -cleanData = anti_join(originalData, deleteData) - -print(paste0("Clean rows: ", nrow(cleanData))) - -#Check and see if each row from deleteData was removed from originalData - warn if rows not removed -nRowNotDeleted = nrow(cleanData) + nrow(deleteData) - nrow(originalData) -if(nRowNotDeleted != 0) { - print(paste0("WARNING: Did not find a match for ", nRowNotDeleted, " rows in the deletion table")) -} - -#Add extra rows: -extraData = read.csv(addRowsFileName, header = TRUE, check.names=FALSE, colClasses = "character") - -print(paste0("Extra rows: ", nrow(extraData))) - -joinedData = rbind(cleanData, extraData) - -print(paste0("Joined rows: ", nrow(joinedData))) - - -#Write output: -write.csv(joinedData, file = outputFileName, row.names = FALSE, quote = TRUE) - - -