From a3279c0d44ddbd0fe7b3eacccb64a2607bb891c4 Mon Sep 17 00:00:00 2001 From: lb584 <lb584@cam.ac.uk> Date: Thu, 28 Mar 2024 15:47:04 +0000 Subject: [PATCH] pre-airflow migration code retirement --- coordinator/Processor.py | 8 +- coordinator/ProcessorAdvisory.py | 37 +++-- coordinator/ProcessorDeposition.py | 6 +- coordinator/ProcessorEnvironment.py | 5 +- coordinator/ProcessorEpidemiology.py | 9 +- coordinator/ProcessorServer.py | 166 +++++++++++----------- coordinator/ProcessorUtils.py | 50 +++---- coordinator/extra/ProcessorMetResample.py | 7 +- 8 files changed, 142 insertions(+), 146 deletions(-) diff --git a/coordinator/Processor.py b/coordinator/Processor.py index ee37790..3ce7217 100755 --- a/coordinator/Processor.py +++ b/coordinator/Processor.py @@ -448,9 +448,9 @@ class Processor: # run any checks before creating a job directory # if this fails, then make a note once there is a job directory ready = self.process_pre_job(args) - - if not ready: - self.logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") + # + # if not ready: + # self.logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") # create job directory Path(jobPath).mkdir(parents = True, exist_ok = True) @@ -622,7 +622,7 @@ class Processor: endScript(premature = False) @abstractmethod - def process_pre_job(self, args): + def process_pre_job(self, args) -> bool: raise NotImplementedError @abstractmethod diff --git a/coordinator/ProcessorAdvisory.py b/coordinator/ProcessorAdvisory.py index 8815f3e..582bd9e 100644 --- a/coordinator/ProcessorAdvisory.py +++ b/coordinator/ProcessorAdvisory.py @@ -13,14 +13,13 @@ from ProcessorUtils import ( add_filters_to_sublogger, endScript, open_and_check_config, - query_past_successes, short_name ) class ProcessorAdvisory(Processor): - def process_pre_job(self, args): + def process_pre_job(self, args) -> bool: return self.process_pre_job_advisory(args) @@ -37,18 +36,18 @@ class ProcessorAdvisory(Processor): logger = logging.getLogger('Processor.Advisory') add_filters_to_sublogger(logger) - def process_pre_job_advisory(self,input_args: dict): + def process_pre_job_advisory(self,input_args: dict) -> bool: - self.logger.info('started process_pre_job_advisory()') - - # check configs can be loaded - config_fns: List[str] = input_args['config_paths'] - for configFile in config_fns: - try: - config_i = open_and_check_config(configFile) - except: - self.logger.exception(f"Failure in opening or checking config {configFile}") - endScript(premature=True) + # self.logger.info('started process_pre_job_advisory()') + # + # # check configs can be loaded + # config_fns: List[str] = input_args['config_paths'] + # for configFile in config_fns: + # try: + # config_i = open_and_check_config(configFile) + # except: + # self.logger.exception(f"Failure in opening or checking config {configFile}") + # endScript(premature=True) # check pre-requisite jobs are complete @@ -59,13 +58,13 @@ class ProcessorAdvisory(Processor): # needs a YesterdayDateString available for the survey part of # advisory config to point to the status file. - dependent_components = config_i['Advisory'].get( - 'DependentComponents', - ['Deposition','Environment']) - - for dependent_component in dependent_components: + # dependent_components = config_i['Advisory'].get( + # 'DependentComponents', + # ['Deposition','Environment']) - query_past_successes(input_args, dependent_component) + # for dependent_component in dependent_components: + # + # query_past_successes(input_args, dependent_component) return True diff --git a/coordinator/ProcessorDeposition.py b/coordinator/ProcessorDeposition.py index 9fcba63..63adc08 100644 --- a/coordinator/ProcessorDeposition.py +++ b/coordinator/ProcessorDeposition.py @@ -11,7 +11,6 @@ import iris from iris.cube import CubeList from Processor import Processor -from ProcessorServer import process_pre_job_server_download from ProcessorUtils import ( get_only_existing_globs, subprocess_and_log, @@ -24,8 +23,9 @@ class ProcessorDeposition(Processor): """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ - def process_pre_job(self, args): - return process_pre_job_server_download(args) + def process_pre_job(self, args) -> bool: + # return process_pre_job_server_download(args) + return True def process_in_job(self, jobPath, status, configjson, component) -> object: diff --git a/coordinator/ProcessorEnvironment.py b/coordinator/ProcessorEnvironment.py index 4d1a646..a2a34c6 100644 --- a/coordinator/ProcessorEnvironment.py +++ b/coordinator/ProcessorEnvironment.py @@ -10,7 +10,6 @@ import os from Processor import Processor from ProcessorServer import ( get_data_from_server, - process_pre_job_server_download ) from ews_postprocessing.environmental_suitability.env_suit_post_processor import EnvSuitPostProcessor from ews_postprocessing.utils.disease_info import EnvSuitDiseaseInfo @@ -25,8 +24,8 @@ from ProcessorUtils import ( class ProcessorEnvironment(Processor): def process_pre_job(self, args): - return process_pre_job_server_download(args) - + # return process_pre_job_server_download(args) + return True def process_in_job(self, jobPath, status, configjson, component) -> object: return self.process_in_job_env2_0(jobPath, status, configjson, component) diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index 2714e2a..2358409 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -31,13 +31,12 @@ from ProcessorUtils import ( get_only_existing_globs, endJob, add_filters_to_sublogger, - query_past_successes, short_name, disease_latin_name_dict ) class ProcessorEpidemiology(Processor): - def process_pre_job(self, args): + def process_pre_job(self, args) -> bool: return self.process_pre_job_epi(args) @@ -60,9 +59,9 @@ class ProcessorEpidemiology(Processor): self.logger.info('started process_pre_job_epi()') # check pre-requisite jobs are complete - query_past_successes(input_args, 'Deposition') - - query_past_successes(input_args, 'Environment') + # query_past_successes(input_args, 'Deposition') + # + # query_past_successes(input_args, 'Environment') config_fns: List[str] = input_args['config_paths'] diff --git a/coordinator/ProcessorServer.py b/coordinator/ProcessorServer.py index 685b6cc..6a93375 100644 --- a/coordinator/ProcessorServer.py +++ b/coordinator/ProcessorServer.py @@ -23,89 +23,89 @@ from ProcessorUtils import ( logger = logging.getLogger('Processor.Server') add_filters_to_sublogger(logger) -def process_pre_job_server_download(input_args: dict): - '''This is set up for environmental suitability v2.0 and deposition. - Returns a boolean as to whether the job is ready for full processing.''' - - logger.info('started process_pre_job_willow_download()') - - # Check if there is a file available on willow - logger.debug('Checking for file(s) on remote server') - - config_paths: List[str] = input_args['config_paths'] - start_date: str = input_args['start_date'] - component: str = input_args['component'] - - for i,config_path in enumerate(config_paths): - - config = open_and_check_config(config_path) - - config['StartString'] = start_date - - file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) - file_name = Template(config[component]['InputFileTemplate']).substitute(**config) - logger.info(f"Checking for existence of {file_path}/{file_name}.tar.gz") - - timenow = datetime.datetime.now(tz=datetime.timezone.utc).time() - - server_name: str = config['ServerName'] - full_file_path = f"{file_path}/{file_name}.tar.gz" - - """ - check the incoming met data tar exists and is a valid tar file (on either remote or local machine) - """ - if server_name == "": # means the file is local - data_is_ready = os.path.exists(full_file_path) and tarfile.is_tarfile(full_file_path) - else: - cmd_check_file = ["ssh", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", server_name, - f"test -f {full_file_path} && tar -tzf {full_file_path} >/dev/null"] - run_in_shell: bool = False - - description_short = 'subprocess_ssh' - description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz" - - status = subprocess_and_log(cmd_check_file, description_short, description_long, check = False, - shell = run_in_shell) - data_is_ready = status.returncode == 0 - - if not data_is_ready: - - # a time check in UTC. If it's late, raise warning, if very late, raise error - - time_0 = config[component]['TimeExpectedAvailable'] - time_0 = datetime.datetime.strptime(time_0,'%H%M') - - time_until_warn = datetime.timedelta(hours=4) - time_until_error = datetime.timedelta(hours=5) - - time_warn = (time_0 + time_until_warn).time() - time_error = (time_0 + time_until_error).time() - - message = f"Data not yet available for config {i+1} of {len(config_paths)}, expected between {time_0.time()} and {time_error} and long before {time_error}" - - if timenow > time_error: - # job is not able to proceed - - logger.warning(message) - - return False - - elif timenow > time_warn: - # job is not ready to proceed - - logger.warning(message) - endScript(premature=True) - - else: - # some other problem with the job - - logger.info(message) - endScript(premature=True) - - elif data_is_ready: - logger.info(f"Data is available for config {i+1} of {len(config_paths)}, calculation shall proceed") - - return True +# def process_pre_job_server_download(input_args: dict): +# '''This is set up for environmental suitability v2.0 and deposition. +# Returns a boolean as to whether the job is ready for full processing.''' +# +# logger.info('started process_pre_job_willow_download()') +# +# # Check if there is a file available on willow +# logger.debug('Checking for file(s) on remote server') +# +# config_paths: List[str] = input_args['config_paths'] +# start_date: str = input_args['start_date'] +# component: str = input_args['component'] +# +# for i,config_path in enumerate(config_paths): +# +# config = open_and_check_config(config_path) +# +# config['StartString'] = start_date +# +# file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) +# file_name = Template(config[component]['InputFileTemplate']).substitute(**config) +# logger.info(f"Checking for existence of {file_path}/{file_name}.tar.gz") +# +# timenow = datetime.datetime.now(tz=datetime.timezone.utc).time() +# +# server_name: str = config['ServerName'] +# full_file_path = f"{file_path}/{file_name}.tar.gz" +# +# """ +# check the incoming met data tar exists and is a valid tar file (on either remote or local machine) +# """ +# if server_name == "": # means the file is local +# data_is_ready = os.path.exists(full_file_path) and tarfile.is_tarfile(full_file_path) +# else: +# cmd_check_file = ["ssh", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", server_name, +# f"test -f {full_file_path} && tar -tzf {full_file_path} >/dev/null"] +# run_in_shell: bool = False +# +# description_short = 'subprocess_ssh' +# description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz" +# +# status = subprocess_and_log(cmd_check_file, description_short, description_long, check = False, +# shell = run_in_shell) +# data_is_ready = status.returncode == 0 +# +# if not data_is_ready: +# +# # a time check in UTC. If it's late, raise warning, if very late, raise error +# +# time_0 = config[component]['TimeExpectedAvailable'] +# time_0 = datetime.datetime.strptime(time_0,'%H%M') +# +# time_until_warn = datetime.timedelta(hours=4) +# time_until_error = datetime.timedelta(hours=5) +# +# time_warn = (time_0 + time_until_warn).time() +# time_error = (time_0 + time_until_error).time() +# +# message = f"Data not yet available for config {i+1} of {len(config_paths)}, expected between {time_0.time()} and {time_error} and long before {time_error}" +# +# if timenow > time_error: +# # job is not able to proceed +# +# logger.warning(message) +# +# return False +# +# elif timenow > time_warn: +# # job is not ready to proceed +# +# logger.warning(message) +# endScript(premature=True) +# +# else: +# # some other problem with the job +# +# logger.info(message) +# endScript(premature=True) +# +# elif data_is_ready: +# logger.info(f"Data is available for config {i+1} of {len(config_paths)}, calculation shall proceed") +# +# return True def upload(config,FilesToSend,component): diff --git a/coordinator/ProcessorUtils.py b/coordinator/ProcessorUtils.py index 4b3fbe2..76ee3b6 100644 --- a/coordinator/ProcessorUtils.py +++ b/coordinator/ProcessorUtils.py @@ -321,31 +321,31 @@ def query_component_success(config_i,job_run: str, job_to_check: str): return True -def query_past_successes(input_args: dict, - component_to_query: str = 'Deposition'): - '''Checks if deposition and environment jobs are already completed - successfully. If not, it raises an error.''' - - component: str = input_args['component'] - - assert component_to_query in ['Deposition','Environment','Epidemiology'] - - # check configs can be loaded - config_fns: List[str] = input_args['config_paths'] - for configFile in config_fns: - try: - config_i = open_and_check_config(configFile) - except: - logger.exception(f"Failure in opening or checking config {configFile}") - endScript(premature=True) - - # some config initialisation is necessary - config_i['StartString'] = input_args['start_date'] - - # check if dependent job is readily available - query_component_success(config_i,component,component_to_query) - - return True +# def query_past_successes(input_args: dict, +# component_to_query: str = 'Deposition'): +# '''Checks if deposition and environment jobs are already completed +# successfully. If not, it raises an error.''' +# +# component: str = input_args['component'] +# +# assert component_to_query in ['Deposition','Environment','Epidemiology'] +# +# # check configs can be loaded +# config_fns: List[str] = input_args['config_paths'] +# for configFile in config_fns: +# try: +# config_i = open_and_check_config(configFile) +# except: +# logger.exception(f"Failure in opening or checking config {configFile}") +# endScript(premature=True) +# +# # some config initialisation is necessary +# config_i['StartString'] = input_args['start_date'] +# +# # check if dependent job is readily available +# query_component_success(config_i,component,component_to_query) +# +# return True def dataframe_to_series(df: pd.DataFrame) -> pd.Series: """Reformate a pandas Dataframe into a pandas Series. diff --git a/coordinator/extra/ProcessorMetResample.py b/coordinator/extra/ProcessorMetResample.py index 64f6942..f303914 100644 --- a/coordinator/extra/ProcessorMetResample.py +++ b/coordinator/extra/ProcessorMetResample.py @@ -24,24 +24,23 @@ from EpiModel.EpiUtils import ( from Processor import Processor from ProcessorServer import ( - process_pre_job_server_download, get_data_from_server ) from ProcessorUtils import ( add_filters_to_sublogger, calc_epi_date_range, open_and_check_config, - query_past_successes, short_name) class ProcessorMetResample(Processor): - def process_pre_job(self, args): + def process_pre_job(self, args) -> bool: self.logger.debug('Performing process_pre_job()') # If it will work on a single forecast, get a dedicated download #return process_pre_job_server_download(args) - return query_past_successes(args, 'Environment') + # return query_past_successes(args, 'Environment') + return True def process_in_job(self, jobPath, status, configjson, component) -> object: return self.process_in_job_met_resample(jobPath, status, configjson, component) -- GitLab