diff --git a/ews/coordinator/processor_advisory.py b/ews/coordinator/processor_advisory.py index 17614870da4e61a1d6ffcb978a7393ae8ee5b542..50a15e72932557f84347bb140f2c2c43bff04d36 100644 --- a/ews/coordinator/processor_advisory.py +++ b/ews/coordinator/processor_advisory.py @@ -7,6 +7,7 @@ import logging # TODO: Package these projects so they are robust for importing from ews.coordinator.processor_base import ProcessorBase from ews.advisory_builder import data_gatherer +from ews.coordinator.utils.jobstatus import Jobstatus from ews.coordinator.utils.processor_utils import short_name logger = logging.getLogger(__name__) @@ -18,8 +19,8 @@ class ProcessorAdvisory(ProcessorBase): return True - def process_in_job(self, jobPath, status, configjson, component) -> object: - return self.process_in_job_advisory(jobPath, status, configjson, component) + def process_in_job(self, job_path: str, status: Jobstatus, config: dict, component: str) -> object: + return self.process_in_job_advisory(job_path, status, config, component) def process_post_job(self, jobPath, configjson): @@ -30,7 +31,7 @@ class ProcessorAdvisory(ProcessorBase): super().__init__() - def process_in_job_advisory(self, jobPath, status, config, component): + def process_in_job_advisory(self, job_path: str, status: Jobstatus, config: dict, component: str): '''Generates a word processor file containing some basic survey statistics and output figures from deposition, environmental suitability, and eventually also the epi model. This template advisory is intended to speed @@ -40,7 +41,7 @@ class ProcessorAdvisory(ProcessorBase): config_advisory = config.copy() - config_advisory['jobPath'] = jobPath + config_advisory['jobPath'] = job_path # provide top-level arguments to advisory config for k,v in config.items(): diff --git a/ews/coordinator/processor_base.py b/ews/coordinator/processor_base.py index becd5b21d85401132aa4bd4a02377b42b71453ee..535e4d343dac3c9d73907b7b24390e86f6292329 100755 --- a/ews/coordinator/processor_base.py +++ b/ews/coordinator/processor_base.py @@ -291,11 +291,11 @@ class ProcessorBase: logger.info(f'Working on config {configIndex + 1} of {config_paths_length}') try: - configjson: dict = parse_json_file_with_tokens(configtemplate, sys_config) + config: dict = parse_json_file_with_tokens(configtemplate, sys_config) # then add the sys_config keys and values to the configjson for k, v in sys_config.items(): - if k not in configjson.keys(): - configjson[k] = v + if k not in config.keys(): + config[k] = v else: logger.warning(f"Key {k} already present in run config - not adding key with same name from " f"the sys_config") @@ -310,16 +310,16 @@ class ProcessorBase: # from configtemplate create configFileName to describe the specific job config_file_name = f"{os.path.basename(configtemplate).replace('.json', '')}_{component}" - configjson['ConfigFilePath'] = config_file_name + config['ConfigFilePath'] = config_file_name # write the complete configuration file to job directory with open(f"{job_path}/{config_file_name}.json", 'w') as write_file: - json.dump(configjson, write_file, indent = 4) + json.dump(config, write_file, indent = 4) # proc_description = universal_config['ProcessInJob'] proc_description = 'ProcessInJob' try: - proc_out: dict = self.process_in_job(job_path, status, configjson, component) + proc_out: dict = self.process_in_job(job_path, status, config, component) except: logger.exception(f"Error in process_in_job") status.reset('ERROR') @@ -344,7 +344,7 @@ class ProcessorBase: proc_description = 'ProcessEWSPlotting' try: - self.process_post_job(job_path, configjson) + self.process_post_job(job_path, config) except: logger.exception(f"Error in {proc_description}()") status.reset('ERROR') @@ -373,7 +373,7 @@ class ProcessorBase: raise NotImplementedError @abstractmethod - def process_in_job(self, jobPath, status, configjson, component) -> dict: + def process_in_job(self, job_path: str, status: Jobstatus, config: dict, component: str) -> dict: raise NotImplementedError @abstractmethod diff --git a/ews/coordinator/processor_deposition.py b/ews/coordinator/processor_deposition.py index 93e30c7ad3ec9153a7ad33f3e0ce7e5031a3325a..b145c7e35d67c0d71deafe1790c596e0e1d3fd97 100644 --- a/ews/coordinator/processor_deposition.py +++ b/ews/coordinator/processor_deposition.py @@ -11,6 +11,7 @@ import iris from iris.cube import CubeList from ews.coordinator.utils import processor_utils +from ews.coordinator.utils.jobstatus import Jobstatus from ews.coordinator.utils.processor_utils import subprocess_and_log, get_only_existing_globs from ews.coordinator.processor_base import ProcessorBase from ews.postprocessing.deposition.deposition_post_processor import DepositionPostProcessor @@ -26,8 +27,8 @@ class ProcessorDeposition(ProcessorBase): return True - def process_in_job(self, jobPath, status, configjson, component) -> dict: - return self.process_in_job_dep(jobPath, status, configjson, component) + def process_in_job(self, job_path: str, status: Jobstatus, config: dict, component: str) -> dict: + return self.process_in_job_dep(job_path, status, config, component) def process_post_job(self, jobPath, configjson): @@ -41,7 +42,7 @@ class ProcessorDeposition(ProcessorBase): """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ - def process_in_job_dep(self, jobPath, status, config, component): + def process_in_job_dep(self, job_path: str, status: Jobstatus, config: dict, component: str): logger.info('started process_in_job_dep()') file_path = Template(config['ServerPathTemplate']).substitute(**config) @@ -49,14 +50,14 @@ class ProcessorDeposition(ProcessorBase): logger.info(f"Expecting to work with {file_name}") - if os.path.exists(f"{jobPath}/{file_name}"): + if os.path.exists(f"{job_path}/{file_name}"): logger.info('Directory already exists in job directory, so nothing to do here') return logger.info('Copying file from remote server to job directory') # TODO: perform ssh file transfer in python instead of subprocess - cmd_scp = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath] + cmd_scp = ["scp", f"{file_path}/{file_name}.tar.gz", job_path] description_short = 'dep scp' description_long = 'scp from server to job directory' @@ -65,7 +66,7 @@ class ProcessorDeposition(ProcessorBase): logger.info('untarring the input file') # TODO: untar file in python (with tarfile module) instead of subprocess - cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath] + cmd_tar = ["tar","-xzf",f"{job_path}/{file_name}.tar.gz","-C",job_path] description_short = 'dep tars' description_long = 'untar the downloaded file' subprocess_and_log(cmd_tar, description_short, description_long) @@ -78,7 +79,7 @@ class ProcessorDeposition(ProcessorBase): # raise RuntimeError(msg) # basic check that contents are as expected (56 timepoints in the file) - cube_wildcard = f"{jobPath}/{file_name}/deposition_srcs_allregions*.nc" + cube_wildcard = f"{job_path}/{file_name}/deposition_srcs_allregions*.nc" cubes: CubeList = iris.load(cube_wildcard) for cube in cubes: coord = cube.coord("time") @@ -92,7 +93,7 @@ class ProcessorDeposition(ProcessorBase): # Output files available for upload proc_out['output'] = None # Processing files available for clearing - proc_out['clearup'] = [f"{jobPath}/{file_name}.tar.gz"] + proc_out['clearup'] = [f"{job_path}/{file_name}.tar.gz"] return proc_out diff --git a/ews/coordinator/processor_environment.py b/ews/coordinator/processor_environment.py index 8de8150077c0b5aa33a60c431c39a2b39734555f..e06eea483593d4c818f7c499949ed874ddb7d338 100644 --- a/ews/coordinator/processor_environment.py +++ b/ews/coordinator/processor_environment.py @@ -26,8 +26,8 @@ class ProcessorEnvironment(ProcessorBase): def process_pre_job(self, args) -> bool: return True - def process_in_job(self, job_path, status, configjson, component) -> object: - return self.process_in_job_env2_0(job_path, status, configjson, component) + def process_in_job(self, job_path, status, config_dict, component) -> object: + return self.process_in_job_env2_0(job_path, status, config_dict, component) def process_post_job(self, job_path, configjson): diff --git a/ews/coordinator/processor_epidemiology.py b/ews/coordinator/processor_epidemiology.py index 2499a855342b61a0c7f3ac2f783d344a4b8822d8..723bc3df240801dd598c5aacf65ad7231468a752 100644 --- a/ews/coordinator/processor_epidemiology.py +++ b/ews/coordinator/processor_epidemiology.py @@ -14,6 +14,7 @@ from numpy import all, any, argmax, unique, allclose from pandas import MultiIndex, read_csv, DataFrame, to_datetime from rasterio import open as rio_open +from ews.coordinator.utils.jobstatus import Jobstatus # gitlab projects # TODO: Package these projects so they are robust for importing from ews.epi_model import plotRaster, model, EpiAnalysis @@ -35,8 +36,8 @@ class ProcessorEpidemiology(ProcessorBase): return self.process_pre_job_epi(args) - def process_in_job(self, jobPath, status, configjson, component) -> dict: - return self.process_in_job_epi(jobPath, status, configjson, component) + def process_in_job(self, job_path: str, status: Jobstatus, config: dict, component: str) -> dict: + return self.process_in_job_epi(job_path, status, config, component) def process_post_job(self, jobPath, configjson): @@ -263,7 +264,7 @@ class ProcessorEpidemiology(ProcessorBase): return dfm3 - def process_in_job_epi(self, jobPath,status,config,component): + def process_in_job_epi(self, job_path: str, status: Jobstatus, config: dict, component: str): logger.info('started process_in_job_epi()') # TODO: Some of this is modifying config before epi model is run. Determine @@ -335,7 +336,7 @@ class ProcessorEpidemiology(ProcessorBase): # This should be improved, either by making the one config file # aware of all of the iterations, or looping over iterations in # Processor.py with one iteration-specific config. - case_specific_path = f"{jobPath}/{region}/{disease}/" + case_specific_path = f"{job_path}/{region}/{disease}/" Path(case_specific_path).mkdir(parents=True, exist_ok=True) logger.info(f"Preparing for epidemiology calc of {disease} in {region}") diff --git a/ews/coordinator/processor_surveys.py b/ews/coordinator/processor_surveys.py index e4835816cbdd5e9e601573b3405d44bba9e5ffd1..d77869441773bd636e58c805cbdb2912614c767c 100644 --- a/ews/coordinator/processor_surveys.py +++ b/ews/coordinator/processor_surveys.py @@ -43,6 +43,7 @@ from ews.coordinator.survey_servers.processor_surveys_kobotoolbox import get_kob from ews.coordinator.survey_servers.processor_surveys_new_odk import get_newODK_form_as_csv from ews.coordinator.survey_servers.processor_surveys_new_odk2 import get_newODK2_form_as_csv from ews.coordinator.processor_base import ProcessorBase +from ews.coordinator.utils.jobstatus import Jobstatus from ews.coordinator.utils.processor_utils import parse_json_file_with_tokens from ews.source_gen.clustering import run_case @@ -56,8 +57,8 @@ class ProcessorSurveys(ProcessorBase): return True - def process_in_job(self, jobPath, status, configjson, component) -> object: - return self.process_in_job_survey(jobPath, status, configjson, component) + def process_in_job(self, job_path: str, status: Jobstatus, config: dict, component: str) -> object: + return self.process_in_job_survey(job_path, status, config, component) def process_post_job(self, jobPath, configjson) -> [str]: @@ -78,7 +79,8 @@ class ProcessorSurveys(ProcessorBase): 'newODK2' : get_newODK2_form_as_csv, } - def process_in_job_survey(self, jobPath,status,config,component): + + def process_in_job_survey(self, job_path: str, status: Jobstatus, config: dict, component: str): logger.info('started process_in_job_survey()') @@ -100,7 +102,7 @@ class ProcessorSurveys(ProcessorBase): func_get_form_as_csv = self.GET_FORM_AS_CSV_DICT[form['type']] - csv_filename = func_get_form_as_csv(form, jobPath, config, status) + csv_filename = func_get_form_as_csv(form, job_path, config, status) csv_filenames[form['form_id']] = csv_filename @@ -195,7 +197,7 @@ class ProcessorSurveys(ProcessorBase): dfm = concat([dfm,dfi],axis='rows') # save the result - Export_csv_path = f"{jobPath}/ExportCSV/" + Export_csv_path = f"{job_path}/ExportCSV/" Path(Export_csv_path).mkdir(parents = True, exist_ok = True) forms_fn = f"{Export_csv_path}/Merged_SurveyData.csv" dfm.to_csv(forms_fn,index=False,quoting=csv.QUOTE_MINIMAL) @@ -262,14 +264,14 @@ class ProcessorSurveys(ProcessorBase): # prepare environment for clustering calc - upload_directory = f"{jobPath}/upload" + upload_directory = f"{job_path}/upload" Path(upload_directory).mkdir(parents=True, exist_ok=True) if 'Groups' in config: # if 'Groups' is defined in the config, create grouped survey files and run python version logger.info('Preparing grouped survey files') - group_directory = f"{jobPath}/Groups" + group_directory = f"{job_path}/Groups" Path(group_directory).mkdir(parents=True, exist_ok=True) # creating initial groups @@ -325,7 +327,7 @@ class ProcessorSurveys(ProcessorBase): df_group.to_csv(group_surveys_filepath, index=False, quoting=csv.QUOTE_MINIMAL) - output_directory = f"{jobPath}/source_gen/{group_name}" + output_directory = f"{job_path}/source_gen/{group_name}" Path(output_directory).mkdir(parents=True, exist_ok=True) if 'SourcesConfigs' in config and group_name in config['SourcesConfigs']: @@ -354,7 +356,7 @@ class ProcessorSurveys(ProcessorBase): logger.debug('Placing copy of result in job directory with conventional name') output_filename = f"sources_{group_name}_{config['StartString']}.csv" - output_path = f"{jobPath}/upload/{output_filename}" + output_path = f"{job_path}/upload/{output_filename}" logger.debug(f"as {output_path}") @@ -365,7 +367,7 @@ class ProcessorSurveys(ProcessorBase): logger.debug('Additionally placing copy of PROD result in job directory without group name') output_filename = f"sources_{config['StartString']}.csv" - output_path = f"{jobPath}/upload/{output_filename}" + output_path = f"{job_path}/upload/{output_filename}" logger.debug(f"as {output_path}") @@ -373,7 +375,7 @@ class ProcessorSurveys(ProcessorBase): else: # run python version without grouping surveys - output_directory = f"{jobPath}/source_gen" + output_directory = f"{job_path}/source_gen" Path(output_directory).mkdir(parents=True, exist_ok=True) sources_path = run_case( @@ -393,7 +395,7 @@ class ProcessorSurveys(ProcessorBase): logger.debug('Placing copy of result in job directory with conventional name') output_filename = f"sources_{config['StartString']}.csv" - output_path = f"{jobPath}/upload/{output_filename}" + output_path = f"{job_path}/upload/{output_filename}" logger.debug(f"as {output_path}") @@ -401,7 +403,7 @@ class ProcessorSurveys(ProcessorBase): upload_filenames = f"sources_*{config['StartString']}.csv" - upload_path = f"{jobPath}/upload/{upload_filenames}" + upload_path = f"{job_path}/upload/{upload_filenames}" # glob list of output files upload_path_list = glob(upload_path)