diff --git a/coordinator/Processor.py b/coordinator/Processor.py index 9341f71b14ded4150dbf435ee8d4a1ba4e670bf7..46d5c93fd9a0a33e1c34d2ac13d56cd3c5207966 100755 --- a/coordinator/Processor.py +++ b/coordinator/Processor.py @@ -512,7 +512,7 @@ class Processor: endJob(status, ignore_inprogress = True, premature = False) # files and directories that will be uploaded to public server - FilesToSend = [] + files_to_send = [] # files and directories that will be earmarked for removal after a # successful job @@ -550,7 +550,7 @@ class Processor: # proc_description = universal_config['ProcessInJob'] proc_description = 'ProcessInJob' try: - proc_out = self.process_in_job(job_path, status, configjson, component) + proc_out: dict = self.process_in_job(job_path, status, configjson, component) except: self.logger.exception(f"Error in process_in_job") status.reset('ERROR') @@ -566,7 +566,7 @@ class Processor: if 'output' in proc_out.keys(): append_item_to_list( proc_out['output'], - FilesToSend, + files_to_send, proc_description, status) @@ -582,7 +582,7 @@ class Processor: # proc_description = universal_config['ProcessEWSPlotting'] proc_description = 'ProcessEWSPlotting' try: - EWSPlottingOutputs = self.process_post_job(job_path, configjson) + ews_plotting_outputs = self.process_post_job(job_path, configjson) except: self.logger.exception(f"Error in {proc_description}()") status.reset('ERROR') @@ -590,10 +590,10 @@ class Processor: self.logger.info('Finished with EWS-Plotting, appending images to list for transfer') - if EWSPlottingOutputs: + if ews_plotting_outputs: append_item_to_list( - EWSPlottingOutputs, - FilesToSend, + ews_plotting_outputs, + files_to_send, proc_description, status) @@ -603,7 +603,7 @@ class Processor: if not noupload: try: - ProcessorComponents.upload(universal_config, FilesToSend, component) + ProcessorComponents.upload(universal_config, files_to_send, component) except IndexError: status.reset('WARNING') @@ -625,7 +625,7 @@ class Processor: universal_config_extra['ServerPath'] = extra_path try: - ProcessorComponents.upload(universal_config_extra, FilesToSend, component) + ProcessorComponents.upload(universal_config_extra, files_to_send, component) except IndexError: status.reset('WARNING') @@ -656,11 +656,11 @@ class Processor: raise NotImplementedError @abstractmethod - def process_in_job(self, jobPath, status, configjson, component) -> object: + def process_in_job(self, jobPath, status, configjson, component) -> dict: raise NotImplementedError @abstractmethod - def process_post_job(self, jobPath, configjson): + def process_post_job(self, jobPath, configjson) -> [str]: raise NotImplementedError @abstractmethod diff --git a/coordinator/ProcessorComponents.py b/coordinator/ProcessorComponents.py index f2e839138d6dcebb3b53c99d27807d44a5787581..2eb745b8668ca78270d56b25ecf1d2c9b9b690d0 100644 --- a/coordinator/ProcessorComponents.py +++ b/coordinator/ProcessorComponents.py @@ -12,10 +12,10 @@ from typing import List # coordinator stages: pre, in (during) and plotting. -# from ProcessorServer import ( -# process_pre_job_server_download, -# upload -# ) +from ProcessorServer import ( + # process_pre_job_server_download, + upload +) from ProcessorUtils import ( add_filters_to_sublogger, diff --git a/coordinator/ProcessorDeposition.py b/coordinator/ProcessorDeposition.py index e84d5fcb8fdc9144fbcb085a90eba201e4da04a9..68fe833a65e1321bf95db569b5e8b811fe926e7c 100644 --- a/coordinator/ProcessorDeposition.py +++ b/coordinator/ProcessorDeposition.py @@ -28,11 +28,11 @@ class ProcessorDeposition(Processor): return True - def process_in_job(self, jobPath, status, configjson, component) -> object: + def process_in_job(self, jobPath, status, configjson, component) -> dict: return self.process_in_job_dep(jobPath, status, configjson, component) - def process_post_job(self, jobPath, configjson): + def process_post_job(self, jobPath, configjson) -> [str]: return self.process_EWS_plotting_dep(jobPath, configjson) @@ -108,7 +108,7 @@ class ProcessorDeposition(Processor): return proc_out - def process_EWS_plotting_dep(self, jobPath, config): + def process_EWS_plotting_dep(self, jobPath, config) -> [str]: '''Returns a list of output files for transfer.''' self.logger.info('started process_EWS_plotting_dep()') @@ -125,7 +125,7 @@ class ProcessorDeposition(Processor): deposition_data_file_name = Template(config['Deposition']['DataFileTemplate']).substitute(**config) name_file_wildcard = f"{deposition_path}/{deposition_data_file_name}" - EWSPlottingOutputGlobs = [] + ews_plotting_output_globs = [] for region in regions: @@ -164,26 +164,26 @@ class ProcessorDeposition(Processor): depo_processor.process() # check the output - EWSPlottingOutputDir = f"{output_dir}/images/" + ews_plotting_output_dir = f"{output_dir}/images/" #EWSPlottingOutputGlobs += [ # # daily plots # f"{EWSPlottingOutputDir}Daily/deposition_{region.lower()}_*_daily_20*.png", # # weekly plots # f"{EWSPlottingOutputDir}Weekly/deposition_{region.lower()}_*_total_20*.png"] - EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"] + ews_plotting_output_globs += [f"{ews_plotting_output_dir}*"] - EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) + ews_plotting_output_globs = get_only_existing_globs(ews_plotting_output_globs, inplace = False) # check there is some output from EWS-plotting - if not EWSPlottingOutputGlobs: + if not ews_plotting_output_globs: self.logger.error('EWS-Plotting did not produce any output') raise RuntimeError # provide list for transfer - EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)]) + ews_plotting_outputs: [str] = sorted([file for glob_str in ews_plotting_output_globs for file in glob(glob_str)]) - return EWSPlottingOutputs + return ews_plotting_outputs if __name__ == '__main__': diff --git a/coordinator/ProcessorEnvironment.py b/coordinator/ProcessorEnvironment.py index c927e41aa0a818f55e201d8941c2fc2d19d64e44..041ce5dc8841ac14eb370c431cba77f6f75f21cf 100644 --- a/coordinator/ProcessorEnvironment.py +++ b/coordinator/ProcessorEnvironment.py @@ -35,14 +35,14 @@ class ProcessorEnvironment(Processor): return self.process_in_job_env2_0(jobPath, status, configjson, component) - def process_post_job(self, jobPath, configjson): + def process_post_job(self, jobPath, configjson) -> [str]: return self.process_EWS_plotting_env2_0(jobPath, configjson) def __init__(self) -> None: super().__init__() - def process_in_job_env2_0(self, jobPath,status,config,component): + def process_in_job_env2_0(self, jobPath,status,config,component) -> dict: '''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.''' self.logger.info('started process_in_job_env2_0()') @@ -122,7 +122,7 @@ class ProcessorEnvironment(Processor): run_params_dict[RUN_PARAMS.FILTER_FOR_COUNTRY_KEY] = "False"''' #TODO test if this works - def process_EWS_plotting_env2_0(self, jobPath,config): + def process_EWS_plotting_env2_0(self, jobPath,config) -> [str]: '''Configures the plotting arguments and calls EWS-plotting as a python module. Returns a list of output files for transfer.''' @@ -134,7 +134,7 @@ class ProcessorEnvironment(Processor): subregions = config['SubRegionNames'] - EWSPlottingOutputGlobs = [] + ews_plotting_output_globs = [] # work on each region for region in subregions: @@ -176,27 +176,27 @@ class ProcessorEnvironment(Processor): env_suit_processor.process() # check the output - EWSPlottingOutputDir = f"{output_dir}/images/" + ews_plotting_output_dir = f"{output_dir}/images/" #EWSPlottingOutputGlobs += [ # # daily plots # f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png", # # weekly plots # f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"] - EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"] + ews_plotting_output_globs = [f"{ews_plotting_output_dir}*"] # check the output - EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) + ews_plotting_output_globs = get_only_existing_globs(ews_plotting_output_globs,inplace=False) # check there is some output from EWS-plotting - if not EWSPlottingOutputGlobs: + if not ews_plotting_output_globs: self.logger.error('EWS-Plotting did not produce any output') raise RuntimeError # provide list for transfer - EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)]) + ews_plotting_outputs: [str] = sorted([file for glob_str in ews_plotting_output_globs for file in glob(glob_str)]) - return EWSPlottingOutputs + return ews_plotting_outputs if __name__ == '__main__': diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index b26a20dca20bfc83574bad46c1bbb8410b0ff891..7ab4aa180650ddefcddeed5a244b12f8432659a5 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -45,11 +45,11 @@ class ProcessorEpidemiology(Processor): return self.process_pre_job_epi(args) - def process_in_job(self, jobPath, status, configjson, component) -> object: + def process_in_job(self, jobPath, status, configjson, component) -> dict: return self.process_in_job_epi(jobPath, status, configjson, component) - def process_post_job(self, jobPath, configjson): + def process_post_job(self, jobPath, configjson) -> [str]: return self.process_EWS_plotting_epi(jobPath, configjson) @@ -663,7 +663,7 @@ class ProcessorEpidemiology(Processor): return proc_out - def process_EWS_plotting_epi(self, jobPath,config): + def process_EWS_plotting_epi(self, jobPath,config) -> [str]: '''Returns a list of output files for transfer.''' self.logger.info('started process_EWS_plotting_epi()') @@ -707,7 +707,7 @@ class ProcessorEpidemiology(Processor): Path(ews_plot_dir).mkdir(parents=True, exist_ok=True) # loop over diseases - EWSPlottingOutputGlobs = [] + ews_plotting_output_globs = [] for disease in diseases: disease_short = disease.lower().replace('rust','') @@ -772,21 +772,22 @@ class ProcessorEpidemiology(Processor): epi_processor_2.process() # check the output - EWSPlottingOutputDir = f"{ews_plot_dir}/images/" + ews_plotting_output_dir = f"{ews_plot_dir}/images/" # TODO: Make this smarter, connected to the results of EWSPlottingEPIBase.plot_epi() - EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}infection_{plotting_region_name_lower}_*{disease_short}*.png"] + ews_plotting_output_globs: [str] = [] + ews_plotting_output_globs += [f"{ews_plotting_output_dir}infection_{plotting_region_name_lower}_*{disease_short}*.png"] - EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) + ews_plotting_output_globs = get_only_existing_globs(ews_plotting_output_globs,inplace=False) # check there is some output from EWS-plotting - if not EWSPlottingOutputGlobs: + if not ews_plotting_output_globs: self.logger.error('EWS-Plotting did not produce any output') raise RuntimeError # provide to list for transfer - EWSPlottingOutputs = [item for EWSPlottingOutput in EWSPlottingOutputGlobs for item in glob(EWSPlottingOutput)] + ews_plotting_outputs: [str] = [item for ews_plotting_output in ews_plotting_output_globs for item in glob(ews_plotting_output)] - return EWSPlottingOutputs + return ews_plotting_outputs if __name__ == '__main__': diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py index 2bfffc66d9db9b164afee1bd63ec1b8b070c4666..2cda25ec07a00ed041be7ad8ed395e76048296a6 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/ProcessorSurveys.py @@ -68,7 +68,7 @@ class ProcessorSurveys(Processor): return self.process_in_job_survey(jobPath, status, configjson, component) - def process_post_job(self, jobPath, configjson): + def process_post_job(self, jobPath, configjson) -> [str]: return self.process_EWS_plotting_survey(jobPath, configjson) diff --git a/coordinator/ProcessorUtils.py b/coordinator/ProcessorUtils.py index 76ee3b64dbbcbd92631d3e7b4d2c07a0e712403e..cbcede7547f4e191073296017de5b0eac81af1f4 100644 --- a/coordinator/ProcessorUtils.py +++ b/coordinator/ProcessorUtils.py @@ -119,7 +119,8 @@ def open_and_check_config(configFile): return config -def get_only_existing_globs(file_globs,inplace=True): + +def get_only_existing_globs(file_globs: [str], inplace = True): if inplace: for i,fg in enumerate(file_globs): diff --git a/coordinator/extra/ProcessorMetResample.py b/coordinator/extra/ProcessorMetResample.py index 15d679c3b67731368a550405095a3b5a97d218d9..c8b1b943f0b837352b363853e93a78625dc9a821 100644 --- a/coordinator/extra/ProcessorMetResample.py +++ b/coordinator/extra/ProcessorMetResample.py @@ -23,9 +23,6 @@ from EpiModel.EpiUtils import ( parse_template_string) from Processor import Processor -from ProcessorServer import ( - get_data_from_server -) from ProcessorUtils import ( add_filters_to_sublogger, calc_epi_date_range,