FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit a3279c0d authored by L. Bower's avatar L. Bower
Browse files

pre-airflow migration code retirement

parent 9736bd28
No related branches found
No related tags found
No related merge requests found
......@@ -448,9 +448,9 @@ class Processor:
# run any checks before creating a job directory
# if this fails, then make a note once there is a job directory
ready = self.process_pre_job(args)
if not ready:
self.logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it")
#
# if not ready:
# self.logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it")
# create job directory
Path(jobPath).mkdir(parents = True, exist_ok = True)
......@@ -622,7 +622,7 @@ class Processor:
endScript(premature = False)
@abstractmethod
def process_pre_job(self, args):
def process_pre_job(self, args) -> bool:
raise NotImplementedError
@abstractmethod
......
......@@ -13,14 +13,13 @@ from ProcessorUtils import (
add_filters_to_sublogger,
endScript,
open_and_check_config,
query_past_successes,
short_name
)
class ProcessorAdvisory(Processor):
def process_pre_job(self, args):
def process_pre_job(self, args) -> bool:
return self.process_pre_job_advisory(args)
......@@ -37,18 +36,18 @@ class ProcessorAdvisory(Processor):
logger = logging.getLogger('Processor.Advisory')
add_filters_to_sublogger(logger)
def process_pre_job_advisory(self,input_args: dict):
def process_pre_job_advisory(self,input_args: dict) -> bool:
self.logger.info('started process_pre_job_advisory()')
# check configs can be loaded
config_fns: List[str] = input_args['config_paths']
for configFile in config_fns:
try:
config_i = open_and_check_config(configFile)
except:
self.logger.exception(f"Failure in opening or checking config {configFile}")
endScript(premature=True)
# self.logger.info('started process_pre_job_advisory()')
#
# # check configs can be loaded
# config_fns: List[str] = input_args['config_paths']
# for configFile in config_fns:
# try:
# config_i = open_and_check_config(configFile)
# except:
# self.logger.exception(f"Failure in opening or checking config {configFile}")
# endScript(premature=True)
# check pre-requisite jobs are complete
......@@ -59,13 +58,13 @@ class ProcessorAdvisory(Processor):
# needs a YesterdayDateString available for the survey part of
# advisory config to point to the status file.
dependent_components = config_i['Advisory'].get(
'DependentComponents',
['Deposition','Environment'])
for dependent_component in dependent_components:
# dependent_components = config_i['Advisory'].get(
# 'DependentComponents',
# ['Deposition','Environment'])
query_past_successes(input_args, dependent_component)
# for dependent_component in dependent_components:
#
# query_past_successes(input_args, dependent_component)
return True
......
......@@ -11,7 +11,6 @@ import iris
from iris.cube import CubeList
from Processor import Processor
from ProcessorServer import process_pre_job_server_download
from ProcessorUtils import (
get_only_existing_globs,
subprocess_and_log,
......@@ -24,8 +23,9 @@ class ProcessorDeposition(Processor):
""" LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """
def process_pre_job(self, args):
return process_pre_job_server_download(args)
def process_pre_job(self, args) -> bool:
# return process_pre_job_server_download(args)
return True
def process_in_job(self, jobPath, status, configjson, component) -> object:
......
......@@ -10,7 +10,6 @@ import os
from Processor import Processor
from ProcessorServer import (
get_data_from_server,
process_pre_job_server_download
)
from ews_postprocessing.environmental_suitability.env_suit_post_processor import EnvSuitPostProcessor
from ews_postprocessing.utils.disease_info import EnvSuitDiseaseInfo
......@@ -25,8 +24,8 @@ from ProcessorUtils import (
class ProcessorEnvironment(Processor):
def process_pre_job(self, args):
return process_pre_job_server_download(args)
# return process_pre_job_server_download(args)
return True
def process_in_job(self, jobPath, status, configjson, component) -> object:
return self.process_in_job_env2_0(jobPath, status, configjson, component)
......
......@@ -31,13 +31,12 @@ from ProcessorUtils import (
get_only_existing_globs,
endJob,
add_filters_to_sublogger,
query_past_successes,
short_name,
disease_latin_name_dict
)
class ProcessorEpidemiology(Processor):
def process_pre_job(self, args):
def process_pre_job(self, args) -> bool:
return self.process_pre_job_epi(args)
......@@ -60,9 +59,9 @@ class ProcessorEpidemiology(Processor):
self.logger.info('started process_pre_job_epi()')
# check pre-requisite jobs are complete
query_past_successes(input_args, 'Deposition')
query_past_successes(input_args, 'Environment')
# query_past_successes(input_args, 'Deposition')
#
# query_past_successes(input_args, 'Environment')
config_fns: List[str] = input_args['config_paths']
......
......@@ -23,89 +23,89 @@ from ProcessorUtils import (
logger = logging.getLogger('Processor.Server')
add_filters_to_sublogger(logger)
def process_pre_job_server_download(input_args: dict):
'''This is set up for environmental suitability v2.0 and deposition.
Returns a boolean as to whether the job is ready for full processing.'''
logger.info('started process_pre_job_willow_download()')
# Check if there is a file available on willow
logger.debug('Checking for file(s) on remote server')
config_paths: List[str] = input_args['config_paths']
start_date: str = input_args['start_date']
component: str = input_args['component']
for i,config_path in enumerate(config_paths):
config = open_and_check_config(config_path)
config['StartString'] = start_date
file_path = Template(config[component]['ServerPathTemplate']).substitute(**config)
file_name = Template(config[component]['InputFileTemplate']).substitute(**config)
logger.info(f"Checking for existence of {file_path}/{file_name}.tar.gz")
timenow = datetime.datetime.now(tz=datetime.timezone.utc).time()
server_name: str = config['ServerName']
full_file_path = f"{file_path}/{file_name}.tar.gz"
"""
check the incoming met data tar exists and is a valid tar file (on either remote or local machine)
"""
if server_name == "": # means the file is local
data_is_ready = os.path.exists(full_file_path) and tarfile.is_tarfile(full_file_path)
else:
cmd_check_file = ["ssh", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", server_name,
f"test -f {full_file_path} && tar -tzf {full_file_path} >/dev/null"]
run_in_shell: bool = False
description_short = 'subprocess_ssh'
description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz"
status = subprocess_and_log(cmd_check_file, description_short, description_long, check = False,
shell = run_in_shell)
data_is_ready = status.returncode == 0
if not data_is_ready:
# a time check in UTC. If it's late, raise warning, if very late, raise error
time_0 = config[component]['TimeExpectedAvailable']
time_0 = datetime.datetime.strptime(time_0,'%H%M')
time_until_warn = datetime.timedelta(hours=4)
time_until_error = datetime.timedelta(hours=5)
time_warn = (time_0 + time_until_warn).time()
time_error = (time_0 + time_until_error).time()
message = f"Data not yet available for config {i+1} of {len(config_paths)}, expected between {time_0.time()} and {time_error} and long before {time_error}"
if timenow > time_error:
# job is not able to proceed
logger.warning(message)
return False
elif timenow > time_warn:
# job is not ready to proceed
logger.warning(message)
endScript(premature=True)
else:
# some other problem with the job
logger.info(message)
endScript(premature=True)
elif data_is_ready:
logger.info(f"Data is available for config {i+1} of {len(config_paths)}, calculation shall proceed")
return True
# def process_pre_job_server_download(input_args: dict):
# '''This is set up for environmental suitability v2.0 and deposition.
# Returns a boolean as to whether the job is ready for full processing.'''
#
# logger.info('started process_pre_job_willow_download()')
#
# # Check if there is a file available on willow
# logger.debug('Checking for file(s) on remote server')
#
# config_paths: List[str] = input_args['config_paths']
# start_date: str = input_args['start_date']
# component: str = input_args['component']
#
# for i,config_path in enumerate(config_paths):
#
# config = open_and_check_config(config_path)
#
# config['StartString'] = start_date
#
# file_path = Template(config[component]['ServerPathTemplate']).substitute(**config)
# file_name = Template(config[component]['InputFileTemplate']).substitute(**config)
# logger.info(f"Checking for existence of {file_path}/{file_name}.tar.gz")
#
# timenow = datetime.datetime.now(tz=datetime.timezone.utc).time()
#
# server_name: str = config['ServerName']
# full_file_path = f"{file_path}/{file_name}.tar.gz"
#
# """
# check the incoming met data tar exists and is a valid tar file (on either remote or local machine)
# """
# if server_name == "": # means the file is local
# data_is_ready = os.path.exists(full_file_path) and tarfile.is_tarfile(full_file_path)
# else:
# cmd_check_file = ["ssh", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", server_name,
# f"test -f {full_file_path} && tar -tzf {full_file_path} >/dev/null"]
# run_in_shell: bool = False
#
# description_short = 'subprocess_ssh'
# description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz"
#
# status = subprocess_and_log(cmd_check_file, description_short, description_long, check = False,
# shell = run_in_shell)
# data_is_ready = status.returncode == 0
#
# if not data_is_ready:
#
# # a time check in UTC. If it's late, raise warning, if very late, raise error
#
# time_0 = config[component]['TimeExpectedAvailable']
# time_0 = datetime.datetime.strptime(time_0,'%H%M')
#
# time_until_warn = datetime.timedelta(hours=4)
# time_until_error = datetime.timedelta(hours=5)
#
# time_warn = (time_0 + time_until_warn).time()
# time_error = (time_0 + time_until_error).time()
#
# message = f"Data not yet available for config {i+1} of {len(config_paths)}, expected between {time_0.time()} and {time_error} and long before {time_error}"
#
# if timenow > time_error:
# # job is not able to proceed
#
# logger.warning(message)
#
# return False
#
# elif timenow > time_warn:
# # job is not ready to proceed
#
# logger.warning(message)
# endScript(premature=True)
#
# else:
# # some other problem with the job
#
# logger.info(message)
# endScript(premature=True)
#
# elif data_is_ready:
# logger.info(f"Data is available for config {i+1} of {len(config_paths)}, calculation shall proceed")
#
# return True
def upload(config,FilesToSend,component):
......
......@@ -321,31 +321,31 @@ def query_component_success(config_i,job_run: str, job_to_check: str):
return True
def query_past_successes(input_args: dict,
component_to_query: str = 'Deposition'):
'''Checks if deposition and environment jobs are already completed
successfully. If not, it raises an error.'''
component: str = input_args['component']
assert component_to_query in ['Deposition','Environment','Epidemiology']
# check configs can be loaded
config_fns: List[str] = input_args['config_paths']
for configFile in config_fns:
try:
config_i = open_and_check_config(configFile)
except:
logger.exception(f"Failure in opening or checking config {configFile}")
endScript(premature=True)
# some config initialisation is necessary
config_i['StartString'] = input_args['start_date']
# check if dependent job is readily available
query_component_success(config_i,component,component_to_query)
return True
# def query_past_successes(input_args: dict,
# component_to_query: str = 'Deposition'):
# '''Checks if deposition and environment jobs are already completed
# successfully. If not, it raises an error.'''
#
# component: str = input_args['component']
#
# assert component_to_query in ['Deposition','Environment','Epidemiology']
#
# # check configs can be loaded
# config_fns: List[str] = input_args['config_paths']
# for configFile in config_fns:
# try:
# config_i = open_and_check_config(configFile)
# except:
# logger.exception(f"Failure in opening or checking config {configFile}")
# endScript(premature=True)
#
# # some config initialisation is necessary
# config_i['StartString'] = input_args['start_date']
#
# # check if dependent job is readily available
# query_component_success(config_i,component,component_to_query)
#
# return True
def dataframe_to_series(df: pd.DataFrame) -> pd.Series:
"""Reformate a pandas Dataframe into a pandas Series.
......
......@@ -24,24 +24,23 @@ from EpiModel.EpiUtils import (
from Processor import Processor
from ProcessorServer import (
process_pre_job_server_download,
get_data_from_server
)
from ProcessorUtils import (
add_filters_to_sublogger,
calc_epi_date_range,
open_and_check_config,
query_past_successes,
short_name)
class ProcessorMetResample(Processor):
def process_pre_job(self, args):
def process_pre_job(self, args) -> bool:
self.logger.debug('Performing process_pre_job()')
# If it will work on a single forecast, get a dedicated download
#return process_pre_job_server_download(args)
return query_past_successes(args, 'Environment')
# return query_past_successes(args, 'Environment')
return True
def process_in_job(self, jobPath, status, configjson, component) -> object:
return self.process_in_job_met_resample(jobPath, status, configjson, component)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment