From 9bf25b74f076fbb06b9cc6fb4fa06c379206f3b2 Mon Sep 17 00:00:00 2001 From: lb584 <lb584@cam.ac.uk> Date: Tue, 10 Oct 2023 12:00:42 +0100 Subject: [PATCH] experimenting with making processor class-based --- coordinator/Processor.py | 1015 ++++++++++++++-------------- coordinator/ProcessorComponents.py | 8 +- coordinator/ProcessorDeposition.py | 232 ++++--- 3 files changed, 659 insertions(+), 596 deletions(-) diff --git a/coordinator/Processor.py b/coordinator/Processor.py index 84d3d3b..c5c2558 100755 --- a/coordinator/Processor.py +++ b/coordinator/Processor.py @@ -1,4 +1,4 @@ -#Processor.py +# Processor.py '''To be used for handling any operational component of wheat rust early warning system. @@ -16,7 +16,8 @@ Example usage:: or:: $ ./run_Processor.sh -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715 ''' -from typing import List +from abc import abstractmethod, ABCMeta +from typing import List, Union, Any print("Make sure to `conda activate py3EWSepi` environment!") @@ -33,582 +34,618 @@ import shutil import sys # gitlab projects -from flagdir import jobStatus # created by jws52 +from flagdir import jobStatus # created by jws52 # submodules of this project import BufferingSMTPHandler import ProcessorComponents from ProcessorUtils import ( - append_item_to_list, - clear_up, - endScript, - endJob, - open_and_check_config, - PasswordODKFilter, - short_name + append_item_to_list, + clear_up, + endScript, + endJob, + open_and_check_config, + PasswordODKFilter, + short_name ) -# initialise default values for configuration -script_name = 'Processor' +class Processor: -timeNow = datetime.datetime.today() -dateToday = timeNow.date() -todayString = timeNow.strftime('%Y%m%d') -nowString = timeNow.strftime('%Y%m%d-%H%M-%S') + __metaclass__ = ABCMeta -# get the path to this script -script_path = os.path.dirname(__file__)+'/' + logger = None + log_path_default = None + loglevels = None + todayString = None -coordinator_path = script_path -# log file for all jobs -log_path_project = f"{coordinator_path}logs/log.txt" + def __init__(self) -> None: + super().__init__() + self.setup() -# job-specific log file will be written here until a job directory exits, when it will be moved there -log_path_default = f"{coordinator_path}logs/log_{nowString}.txt" -# get the email credentials file path from the environment variables -assert 'EMAIL_CRED' in os.environ -email_credential_fn = os.environ['EMAIL_CRED'] -assert os.path.exists(email_credential_fn) + def setup(self): + # initialise default values for configuration -with open(email_credential_fn,'r') as f: - gmail_config = json.load(f) + script_name = 'Processor' -# check contents -required_keys = ['user','pass','host','port','toaddrs'] -for required_key in required_keys: - assert required_key in gmail_config + timeNow = datetime.datetime.today() + dateToday = timeNow.date() + self.todayString = timeNow.strftime('%Y%m%d') + self.nowString = timeNow.strftime('%Y%m%d-%H%M-%S') -# load config from python dictionary (could be loaded from json file) -# TODO: smtp handler can only use tls, but ssl is more secure. Look into defining/writing a suitable smtp handler -logConfigDict = { - 'version' : 1, - 'disable_existing_loggers': False, - 'formatters' : { - 'simple' : { - 'format' : '%(name)s : %(levelname)s - %(message)s' + # get the path to this script + script_path = os.path.dirname(__file__) + '/' + + coordinator_path = script_path + + # log file for all jobs + log_path_project = f"{coordinator_path}logs/log.txt" + + # job-specific log file will be written here until a job directory exits, when it will be moved there + self.log_path_default = f"{coordinator_path}logs/log_{self.nowString}.txt" + + # get the email credentials file path from the environment variables + assert 'EMAIL_CRED' in os.environ + email_credential_fn = os.environ['EMAIL_CRED'] + assert os.path.exists(email_credential_fn) + + with open(email_credential_fn, 'r') as f: + gmail_config = json.load(f) + + # check contents + required_keys = ['user', 'pass', 'host', 'port', 'toaddrs'] + for required_key in required_keys: + assert required_key in gmail_config + + # load config from python dictionary (could be loaded from json file) + # TODO: smtp handler can only use tls, but ssl is more secure. Look into defining/writing a suitable smtp handler + logConfigDict = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'simple': { + 'format': '%(name)s : %(levelname)s - %(message)s' }, - 'detailed' : { - 'format' : f""" - For command: - {' '.join(sys.argv)} + 'detailed': { + 'format': f""" + For command: + {' '.join(sys.argv)} - %(levelname)s in %(name)s encountered at %(asctime)s: + %(levelname)s in %(name)s encountered at %(asctime)s: - %(message)s + %(message)s - Resolve this error and restart processing. - - """, - 'datefmt' : '%Y-%m-%d %H:%M:%S' - } - }, - 'filters' : { - 'mask_passwords' : { - '()' : PasswordODKFilter - } - }, - 'handlers' : { - - # logging for project - 'handler_rot_file' : { - 'class' : 'logging.handlers.TimedRotatingFileHandler', - 'level' : 'INFO', - 'formatter' : 'simple', - 'filters' : ['mask_passwords'], - 'filename' : log_path_project, - # time of day on given day - 'when' : 'W2', # rotate file every Wednesday - 'atTime' : datetime.time(1,0,0), # at 1am local time - 'backupCount' : 12, - }, + Resolve this error and restart processing. - # logging for job - 'handler_file' : { - 'class' : 'logging.FileHandler', - 'level' : 'INFO', - 'formatter' : 'simple', - 'filters' : ['mask_passwords'], - 'filename' : log_path_default, - 'mode' : 'a', # 'a' for append + """, + 'datefmt': '%Y-%m-%d %H:%M:%S' + } }, - # to email errors to maintainers - 'handler_buffered_email': { - 'class': 'BufferingSMTPHandler.BufferingSMTPHandler', - 'level': 'ERROR', - 'server': (gmail_config['host'], gmail_config['port']), # host, port. 465 fsor SSL, 587 for tls - 'credentials': (gmail_config['user'], gmail_config['pass']), - 'fromaddr': gmail_config['user'], - 'toaddrs': gmail_config['toaddrs'], - 'subject': 'ERROR in EWS Processor', - 'formatter': 'detailed', - 'filters': ['mask_passwords'], - 'capacity': 100 + 'filters': { + 'mask_passwords': { + '()': PasswordODKFilter + } }, - }, - 'loggers' : { - # this is activated when this script is imported - # i.e. with logging.getLogger('Process.') - script_name : { - 'level' : 'INFO', - 'handlers' : ['handler_rot_file','handler_file','handler_buffered_email'], - 'propagate' : True, + 'handlers': { + + # logging for project + 'handler_rot_file': { + 'class': 'logging.handlers.TimedRotatingFileHandler', + 'level': 'INFO', + 'formatter': 'simple', + 'filters': ['mask_passwords'], + 'filename': log_path_project, + # time of day on given day + 'when': 'W2', # rotate file every Wednesday + 'atTime': datetime.time(1, 0, 0), # at 1am local time + 'backupCount': 12, + }, + + # logging for job + 'handler_file': { + 'class': 'logging.FileHandler', + 'level': 'INFO', + 'formatter': 'simple', + 'filters': ['mask_passwords'], + 'filename': self.log_path_default, + 'mode': 'a', # 'a' for append + }, + # to email errors to maintainers + 'handler_buffered_email': { + 'class': 'BufferingSMTPHandler.BufferingSMTPHandler', + 'level': 'ERROR', + 'server': (gmail_config['host'], gmail_config['port']), + # host, port. 465 fsor SSL, 587 for tls + 'credentials': (gmail_config['user'], gmail_config['pass']), + 'fromaddr': gmail_config['user'], + 'toaddrs': gmail_config['toaddrs'], + 'subject': 'ERROR in EWS Processor', + 'formatter': 'detailed', + 'filters': ['mask_passwords'], + 'capacity': 100 + }, }, - # this is activated when this script is called on the command line - # or from a bash script - # i.e. with logging.getLogger(__name__) when name == '__main__' - '__main__' : { - 'level' : 'INFO', - 'handlers' : ['handler_rot_file','handler_file','handler_buffered_email'], - 'propagate' : True, + 'loggers': { + # this is activated when this script is imported + # i.e. with logging.getLogger('Process.') + script_name: { + 'level': 'INFO', + 'handlers': ['handler_rot_file', 'handler_file', 'handler_buffered_email'], + 'propagate': True, + }, + # this is activated when this script is called on the command line + # or from a bash script + # i.e. with logging.getLogger(__name__) when name == '__main__' + '__main__': { + 'level': 'INFO', + 'handlers': ['handler_rot_file', 'handler_file', 'handler_buffered_email'], + 'propagate': True, + } } } -} - -logging.config.dictConfig(logConfigDict) - -print(__name__) -# create a logger named according to how the file is called -#logger = logging.getLogger(__name__) -logger = logging.getLogger(script_name) - -loglevels = {'debug':logging.DEBUG, - 'info':logging.INFO, - 'warning':logging.WARNING, - 'error':logging.ERROR, - 'critical':logging.CRITICAL, - } - -def move_default_logfile_handler(dstPathName,srcPathName=log_path_default,FileHandlerName='handler_file',): - '''For on-the-fly move of logging from default file to another. Copies the - contents of the source log file to destination, switches file handler in - logger, then removes source log file.''' - - logger.info(f"Moving logfile location from:\n{srcPathName}\nto:\n{dstPathName}") - - # copy old log file to new filename - srcPath = Path(srcPathName) - dstPath = Path(dstPathName) - assert srcPath.exists() - assert dstPath.parent.is_dir() - - oldFileHandler = [h for h in logger.handlers if h.name==FileHandlerName][0] - oldFormatter = oldFileHandler.formatter - - # define new file handler - newfilehandler = logging.FileHandler(dstPath,mode=oldFileHandler.mode) - newfilehandler.setLevel(oldFileHandler.level) - newfilehandler.setFormatter(oldFormatter) - - shutil.copyfile(srcPath,dstPath) - - # add handler for destination file - logger.info('Adding new logging file handler to destination path') - - logger.addHandler(newfilehandler) - - # remove handler for source file - logger.info('Stopping write to old file handler') - logger.removeHandler(oldFileHandler) - oldFileHandler.close() - logger.info('Successfully stopped write to old file handler') - - # delete old log file - logger.info('Deleting old log file, since all content available in new log file stream') - os.rename(srcPathName,srcPathName+'removed') - - return - - -def parse_and_check_args(todayString) -> dict: - - # define the command line arguments - my_parser = argparse.ArgumentParser(description='Command-line arguments for coordinator script of env suitability model') - - # Add the arguments - # positional arguments do not start with - or -- and are always required - # optional arguments start with - or -- and default is required = False - - my_parser.add_argument( - '-p', '--component', - type = str, - choices = list(short_name.keys()), - required = True, - dest = 'component', - help = '''Name of EWS component to process, which must be present - in the config file.''') - - my_parser.add_argument( - '-c', '--config', - metavar = 'path', - type = str, - nargs = '+', # require at least one path - dest = 'config_paths', - required = True, - #default = ['config_Ethiopia_template_stripe.json'], # remove once live - #default = ['config_Bangladesh_template_stripe.json'], # remove once live - #default = ['config_Nepal_template_stripe.json'], # remove once live - help = '''path to a config file(s). More than one can be provided, - in which case each is worked on in turn (e.g. one each for stripe, stem, leaf). - Do not place other options between these.''') - - my_parser.add_argument( - '-l','--loglevel', - action = 'store', - choices = list(loglevels.keys()), - default = 'info', - help = 'verbosity of log messaging (debug, info, warning, error, critical)\n default is debug', - dest = 'log_level', # this names the attribute that will be parsed - ) - - my_parser.add_argument( - '--islive', - action = 'store_true', - help = 'If live, email messages are sent to maintainers for warning and errors', - dest = 'live', - ) - - my_parser.add_argument( - '-s','--start-date','-i','--initial-date', - metavar = 'YYYYMMDD', - action = 'store', - default = todayString, - help = 'Initial day of calculation, starting at 00 UTC (Default is today)', - dest = 'start_date', - ) - - my_parser.add_argument( - '--noupload', - action = 'store_true', - help = 'whether results of script should be saved to willow public directory' - ) - - my_parser.add_argument( - '--clearup', - action = 'store_true', - help = 'whether to delete mid-process files at the end of a successful job', - dest = 'clearup', - ) - - # get an object holding all of the args - args = my_parser.parse_args() - - # Check the args - - logger.info(f"Command-line options are:\n{args}") - - if not isinstance(args.config_paths,list): - logger.error('Expecting a list of config paths') - raise RuntimeError - - # check the startstring - if args.start_date is not todayString: - try: - # check date string is formatted correctly - provided_start_date = datetime.datetime.strptime(args.start_date,'%Y%m%d') - today_date = datetime.datetime.strptime(todayString,'%Y%m%d') - - # early limit is quite arbitrary, but this is earliest year of available survey data for Ethiopia - date_limit_early = datetime.datetime.strptime('20070101','%Y%m%d') - assert date_limit_early < provided_start_date - assert provided_start_date <= today_date - - except (ValueError, AssertionError) as e: - logger.exception("Provided start date string is formatted incorrectly or out of range, or end date not also defined") - raise - - dictionary: dict = vars(args) - return dictionary - - -def set_log_level(log_level: str): - new_log_level = loglevels[log_level] - - # modify log level of all loggers - logger.info(f"logging level being changed to {new_log_level} because of command-line option") - loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] - for logger_i in loggers: logger_i.setLevel(new_log_level) - - -def build_universal_config(configs: list,component: str, universal_config=None): - '''This config obtains aspects of each dict in configs that must be common to - them all. ''' - - # initialise universal config - if not universal_config: - universal_config = { - 'WorkspacePathout' : set(), - 'ProcessPreJob' : set(), - 'ProcessInJob' : set(), - 'ProcessEWSPlotting' : set(), - 'ServerPath' : set(), - 'ServerName' : set(), - 'ServerKey' : set()} - - keys = universal_config.keys() - - # get value of each key from each config file - for configFile in configs: - - try: - - config_i = open_and_check_config(configFile) - - except: - - logger.exception(f"Failure in opening or checking config {configFile}") - endScript(premature=True) - for key in keys: - + logging.config.dictConfig(logConfigDict) + + print(__name__) + # create a logger named according to how the file is called + # logger = logging.getLogger(__name__) + self.logger = logging.getLogger(script_name) + + self.loglevels = {'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL, + } + + + def move_default_logfile_handler(self, dstPathName, srcPathName = None, FileHandlerName = 'handler_file', ): + '''For on-the-fly move of logging from default file to another. Copies the + contents of the source log file to destination, switches file handler in + logger, then removes source log file.''' + + if srcPathName is None: + srcPathName = self.log_path_default + + self.logger.info(f"Moving logfile location from:\n{srcPathName}\nto:\n{dstPathName}") + + # copy old log file to new filename + srcPath = Path(srcPathName) + dstPath = Path(dstPathName) + assert srcPath.exists() + assert dstPath.parent.is_dir() + + oldFileHandler = [h for h in self.logger.handlers if h.name == FileHandlerName][0] + oldFormatter = oldFileHandler.formatter + + # define new file handler + newfilehandler = logging.FileHandler(dstPath, mode = oldFileHandler.mode) + newfilehandler.setLevel(oldFileHandler.level) + newfilehandler.setFormatter(oldFormatter) + + shutil.copyfile(srcPath, dstPath) + + # add handler for destination file + self.logger.info('Adding new logging file handler to destination path') + + self.logger.addHandler(newfilehandler) + + # remove handler for source file + self.logger.info('Stopping write to old file handler') + self.logger.removeHandler(oldFileHandler) + oldFileHandler.close() + self.logger.info('Successfully stopped write to old file handler') + + # delete old log file + self.logger.info('Deleting old log file, since all content available in new log file stream') + os.rename(srcPathName, srcPathName + 'removed') + + return + + + def parse_and_check_args(self) -> dict: + + # define the command line arguments + my_parser = argparse.ArgumentParser( + description = 'Command-line arguments for coordinator script of env suitability model') + + # Add the arguments + # positional arguments do not start with - or -- and are always required + # optional arguments start with - or -- and default is required = False + + my_parser.add_argument( + '-p', '--component', + type = str, + choices = list(short_name.keys()), + required = True, + dest = 'component', + help = '''Name of EWS component to process, which must be present + in the config file.''') + + my_parser.add_argument( + '-c', '--config', + metavar = 'path', + type = str, + nargs = '+', # require at least one path + dest = 'config_paths', + required = True, + # default = ['config_Ethiopia_template_stripe.json'], # remove once live + # default = ['config_Bangladesh_template_stripe.json'], # remove once live + # default = ['config_Nepal_template_stripe.json'], # remove once live + help = '''path to a config file(s). More than one can be provided, + in which case each is worked on in turn (e.g. one each for stripe, stem, leaf). + Do not place other options between these.''') + + my_parser.add_argument( + '-l', '--loglevel', + action = 'store', + choices = list(self.loglevels.keys()), + default = 'info', + help = 'verbosity of log messaging (debug, info, warning, error, critical)\n default is debug', + dest = 'log_level', # this names the attribute that will be parsed + ) + + my_parser.add_argument( + '--islive', + action = 'store_true', + help = 'If live, email messages are sent to maintainers for warning and errors', + dest = 'live', + ) + + my_parser.add_argument( + '-s', '--start-date', '-i', '--initial-date', + metavar = 'YYYYMMDD', + action = 'store', + default = self.todayString, + help = 'Initial day of calculation, starting at 00 UTC (Default is today)', + dest = 'start_date', + ) + + my_parser.add_argument( + '--noupload', + action = 'store_true', + help = 'whether results of script should be saved to willow public directory' + ) + + my_parser.add_argument( + '--clearup', + action = 'store_true', + help = 'whether to delete mid-process files at the end of a successful job', + dest = 'clearup', + ) + + # get an object holding all of the args + args = my_parser.parse_args() + + # Check the args + + self.logger.info(f"Command-line options are:\n{args}") + + if not isinstance(args.config_paths, list): + self.logger.error('Expecting a list of config paths') + raise RuntimeError + + # check the startstring + if args.start_date is not self.todayString: try: - universal_config[key].add(config_i[key]) - - except KeyError: + # check date string is formatted correctly + provided_start_date = datetime.datetime.strptime(args.start_date, '%Y%m%d') + today_date = datetime.datetime.strptime(self.todayString, '%Y%m%d') - # key must be in component sub-dict - universal_config[key].add(config_i[component][key]) + # early limit is quite arbitrary, but this is earliest year of available survey data for Ethiopia + date_limit_early = datetime.datetime.strptime('20070101', '%Y%m%d') + assert date_limit_early < provided_start_date + assert provided_start_date <= today_date - # Check for and keep only one value per key - for key in keys: + except (ValueError, AssertionError) as e: + self.logger.exception( + "Provided start date string is formatted incorrectly or out of range, or end date not also defined") + raise - if len(universal_config[key]) > 1: + dictionary: dict = vars(args) + return dictionary - logger.error(f"Config files point to multiple {key} but this script can only handle one.") - endScript(premature=True) - universal_config[key] = universal_config[key].pop() + def set_log_level(self, log_level: str): + new_log_level = self.loglevels[log_level] - return universal_config + # modify log level of all self.loggers + self.logger.info(f"logging level being changed to {new_log_level} because of command-line option") + loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] + for logger_i in loggers: logger_i.setLevel(new_log_level) -def run_Process(args: dict): - # check initial state of each config file, and gather terms that must apply - # across all provided configs + def build_universal_config(self, configs: list, component: str, universal_config = None): + '''This config obtains aspects of each dict in configs that must be common to + them all. ''' - if not args["live"]: - # remove the log handler that would send emails - logger.handlers = [h for h in logger.handlers if not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] + # initialise universal config + if not universal_config: + universal_config = { + 'WorkspacePathout': set(), + 'ProcessPreJob': set(), + 'ProcessInJob': set(), + 'ProcessEWSPlotting': set(), + 'ServerPath': set(), + 'ServerName': set(), + 'ServerKey': set()} - config_paths: List[str] = args['config_paths'] - component: str = args['component'] - start_date: str = args['start_date'] - noupload: bool = args['noupload'] - clearup: bool = args['clearup'] + keys = universal_config.keys() - universal_config = build_universal_config(config_paths, component) - - universal_config['StartString'] = start_date + # get value of each key from each config file + for configFile in configs: - logger.info(f"Universal config is\n{json.dumps(universal_config,indent=2)}") + try: + + config_i = open_and_check_config(configFile) + + except: + + self.logger.exception(f"Failure in opening or checking config {configFile}") + endScript(premature = True) - workspacePath = universal_config['WorkspacePathout'] + for key in keys: - process_pre_job = getattr(ProcessorComponents, universal_config['ProcessPreJob']) + try: + universal_config[key].add(config_i[key]) + + except KeyError: + + # key must be in component sub-dict + universal_config[key].add(config_i[component][key]) + + # Check for and keep only one value per key + for key in keys: - process_in_job = getattr(ProcessorComponents, universal_config['ProcessInJob']) + if len(universal_config[key]) > 1: + self.logger.error(f"Config files point to multiple {key} but this script can only handle one.") + endScript(premature = True) - process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting']) + universal_config[key] = universal_config[key].pop() - # determine job directory - jobPath = f'{workspacePath}{short_name[component]}_{start_date}' - - logger.info(f"Job path will be {jobPath}") + return universal_config - # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC - startTime = datetime.datetime.strptime(start_date+'03','%Y%m%d%H') - # run any checks before creating a job directory - # if this fails, then make a note once there is a job directory - ready = process_pre_job(args) - - if not ready: - logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") + def run_Process(self, args: dict): + # check initial state of each config file, and gather terms that must apply + # across all provided configs - # create job directory - Path(jobPath).mkdir(parents=True, exist_ok=True) + if not args["live"]: + # remove the log handler that would send emails + self.logger.handlers = [h for h in self.logger.handlers if + not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] - # lock job directory - with jobStatus(jobPath) as status: + config_paths: List[str] = args['config_paths'] + component: str = args['component'] + start_date: str = args['start_date'] + noupload: bool = args['noupload'] + clearup: bool = args['clearup'] - #lawrence comment in/out - # check for a status file in job directory - if status.had_initial_status: - logger.info(f"Job path already exists and has status {status.status}") + universal_config = self.build_universal_config(config_paths, component) - endScript(premature = status.status not in ['SUCCESS','INPROGRESS']) + universal_config['StartString'] = start_date - logger.info(f"Current status of job directory is {status.status}") + self.logger.info(f"Universal config is\n{json.dumps(universal_config, indent = 2)}") - # now that we have a useable job directory, move the log file there - logPathJob = f"{jobPath}/log.txt" + workspacePath = universal_config['WorkspacePathout'] + + # process_pre_job = getattr(ProcessorComponents, universal_config['ProcessPreJob']) + # + # process_in_job = getattr(ProcessorComponents, universal_config['ProcessInJob']) + # + # process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting']) + + # determine job directory + jobPath: str = f'{workspacePath}{short_name[component]}_{start_date}' + + self.logger.info(f"Job path will be {jobPath}") + + # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC + startTime = datetime.datetime.strptime(start_date + '03', '%Y%m%d%H') + + # run any checks before creating a job directory + # if this fails, then make a note once there is a job directory + ready = self.process_pre_job(args) - move_default_logfile_handler(dstPathName=logPathJob) - - # make a record if process_pre_job failed if not ready: - logger.error(f"Process_pre_job raised an error so making a record in the job file. For details, see earlier warnings log") - status.reset('ERROR') - - endJob(status,ignore_inprogress=True,premature=False) - - # files and directories that will be uploaded to public server - FilesToSend = [] + self.logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") - # files and directories that will be earmarked for removal after a - # successful job - paths_to_clear = [] + # create job directory + Path(jobPath).mkdir(parents = True, exist_ok = True) - logger.info('Starting to work on each configuration') + # lock job directory + status: jobStatus + with jobStatus(jobPath) as status: - for configIndex, configtemplate in enumerate(config_paths): + # lawrence comment in/out + # check for a status file in job directory + if status.had_initial_status: + self.logger.info(f"Job path already exists and has status {status.status}") - config_paths_length: int = len(config_paths) - logger.info(f'Working on config {configIndex+1} of {config_paths_length}') - - try: - configjson = open_and_check_config(configtemplate) - except: - logger.exception(f"Failure in opening or checking config {configtemplate}") - # TODO: This case should test flagdir.jobStatus.__exit__() - raise # endJob('ERROR',premature=True) - - # provide specific case details to template config - - configjson['StartTime'] = startTime.strftime('%Y-%m-%d-%H%M') - configjson['StartString'] = start_date - - # from configtemplate create configFileName to describe the specific job - component: str = component - configFileName = f"{os.path.basename(configtemplate).replace('.json','')}_{component}" - - configjson['ConfigFilePath'] = configFileName - - # write the complete configuration file to job directory - with open(f"{jobPath}/{configFileName}.json",'w') as write_file: - json.dump(configjson,write_file,indent=4) - - proc_description = universal_config['ProcessInJob'] - try: - proc_out = process_in_job(jobPath,status,configjson,component) - except: - logger.exception(f"Some error in {proc_description}()") - status.reset('ERROR') - endJob(status,premature=True) - - # Set default case - # This would be improved by implementing a class structure - if proc_out is None: - proc_out = { - 'output' : None, - 'clearup' : None} - - if 'output' in proc_out.keys(): - - append_item_to_list( - proc_out['output'], - FilesToSend, - proc_description, - status) - - if 'clearup' in proc_out.keys(): - - append_item_to_list( - proc_out['clearup'], - paths_to_clear, - proc_description, - status) - - # Run EWS-plotting command - - proc_description = universal_config['ProcessEWSPlotting'] - try: - EWSPlottingOutputs = process_EWS_plotting(jobPath,configjson) - except: - logger.exception(f"Error in {proc_description}()") + endScript(premature = status.status not in ['SUCCESS', 'INPROGRESS']) + + self.logger.info(f"Current status of job directory is {status.status}") + + # now that we have a useable job directory, move the log file there + logPathJob = f"{jobPath}/log.txt" + + self.move_default_logfile_handler(dstPathName = logPathJob) + + # make a record if process_pre_job failed + if not ready: + self.logger.error( + f"Process_pre_job raised an error so making a record in the job file. For details, see earlier warnings log") status.reset('ERROR') - endJob(status,premature=True) - logger.info('Finished with EWS-Plotting, appending images to list for transfer') - - if EWSPlottingOutputs: + endJob(status, ignore_inprogress = True, premature = False) - append_item_to_list( - EWSPlottingOutputs, - FilesToSend, - proc_description, - status) + # files and directories that will be uploaded to public server + FilesToSend = [] - logger.info(f'Finished with config {configIndex+1} of {config_paths_length}') + # files and directories that will be earmarked for removal after a + # successful job + paths_to_clear = [] - # send results to remote server + self.logger.info('Starting to work on each configuration') - if not noupload: - try: - ProcessorComponents.upload(universal_config, FilesToSend, component) - except IndexError: - status.reset('WARNING') + for configIndex, configtemplate in enumerate(config_paths): - except: - logger.exception('Failed to upload files to remote server') - status.reset('ERROR') - endJob(status,premature=True) - - # check if there is a second location on willow to provide results - if 'ServerPathExtra' in configjson[component]: + config_paths_length: int = len(config_paths) + self.logger.info(f'Working on config {configIndex + 1} of {config_paths_length}') + + try: + configjson = open_and_check_config(configtemplate) + except: + self.logger.exception(f"Failure in opening or checking config {configtemplate}") + # TODO: This case should test flagdir.jobStatus.__exit__() + raise # endJob('ERROR',premature=True) + + # provide specific case details to template config + + configjson['StartTime'] = startTime.strftime('%Y-%m-%d-%H%M') + configjson['StartString'] = start_date + + # from configtemplate create configFileName to describe the specific job + component: str = component + configFileName = f"{os.path.basename(configtemplate).replace('.json', '')}_{component}" + + configjson['ConfigFilePath'] = configFileName + + # write the complete configuration file to job directory + with open(f"{jobPath}/{configFileName}.json", 'w') as write_file: + json.dump(configjson, write_file, indent = 4) + + proc_description = universal_config['ProcessInJob'] + try: + proc_out = self.process_in_job(jobPath, status, configjson, component) + except: + self.logger.exception(f"Some error in {proc_description}()") + status.reset('ERROR') + endJob(status, premature = True) + + # Set default case + # This would be improved by implementing a class structure + if proc_out is None: + proc_out = { + 'output': None, + 'clearup': None} + + if 'output' in proc_out.keys(): + append_item_to_list( + proc_out['output'], + FilesToSend, + proc_description, + status) + + if 'clearup' in proc_out.keys(): + append_item_to_list( + proc_out['clearup'], + paths_to_clear, + proc_description, + status) + + # Run EWS-plotting command + + proc_description = universal_config['ProcessEWSPlotting'] + try: + EWSPlottingOutputs = self.process_post_job(jobPath, configjson) + except: + self.logger.exception(f"Error in {proc_description}()") + status.reset('ERROR') + endJob(status, premature = True) + + self.logger.info('Finished with EWS-Plotting, appending images to list for transfer') - logger.info('There is an extra path to send results to:') - - extra_path = configjson[component]['ServerPathExtra'] + if EWSPlottingOutputs: + append_item_to_list( + EWSPlottingOutputs, + FilesToSend, + proc_description, + status) - logger.info(extra_path) + self.logger.info(f'Finished with config {configIndex + 1} of {config_paths_length}') - universal_config_extra = universal_config.copy() - universal_config_extra['ServerPath'] = extra_path + # send results to remote server + if not noupload: try: - ProcessorComponents.upload(universal_config_extra, FilesToSend, component) + ProcessorComponents.upload(universal_config, FilesToSend, component) except IndexError: status.reset('WARNING') except: - logger.exception('Failed to upload files to extra directory on remote server') + self.logger.exception('Failed to upload files to remote server') status.reset('ERROR') - endJob(status,premature=True) - - else: - logger.info('Because noupload argument was present, not sending results to remote server') + endJob(status, premature = True) + + # check if there is a second location on willow to provide results + if 'ServerPathExtra' in configjson[component]: + + self.logger.info('There is an extra path to send results to:') + + extra_path = configjson[component]['ServerPathExtra'] - status.reset('SUCCESS') - - if status.is_success() & (clearup is True): + self.logger.info(extra_path) - logger.info('Clearing up') + universal_config_extra = universal_config.copy() + universal_config_extra['ServerPath'] = extra_path - clearup_dest_dir = f"{workspacePath}/clearup/{short_name[component]}_{start_date}/" - Path(clearup_dest_dir).mkdir(parents=True, exist_ok=True) + try: + ProcessorComponents.upload(universal_config_extra, FilesToSend, component) + except IndexError: + status.reset('WARNING') - logger.info(f"While developing, moving directories to this directory : {clearup_dest_dir}") + except: + self.logger.exception('Failed to upload files to extra directory on remote server') + status.reset('ERROR') + endJob(status, premature = True) - clear_up( paths_to_clear, clearup_dest = clearup_dest_dir) + else: + self.logger.info('Because noupload argument was present, not sending results to remote server') - endScript(premature=False) + status.reset('SUCCESS') + + if status.is_success() & (clearup is True): + self.logger.info('Clearing up') + + clearup_dest_dir = f"{workspacePath}/clearup/{short_name[component]}_{start_date}/" + Path(clearup_dest_dir).mkdir(parents = True, exist_ok = True) + + self.logger.info(f"While developing, moving directories to this directory : {clearup_dest_dir}") + + clear_up(paths_to_clear, clearup_dest = clearup_dest_dir) + + endScript(premature = False) + + @abstractmethod + def process_pre_job(self, args): + raise NotImplementedError + + @abstractmethod + def process_in_job(self, jobPath, status, configjson, component) -> object: + raise NotImplementedError + + @abstractmethod + def process_post_job(self, jobPath, configjson): + raise NotImplementedError if __name__ == '__main__': + print("Make sure to `conda activate py3EWSepi` environment!") + print("Make sure that flagdir package is available (on PYTHONPATH)") + processor = Processor() try: - logger.info("==========") - logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") + processor.logger.info("==========") + processor.logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") # load configurations - args_dict: dict = parse_and_check_args(todayString) - set_log_level(args_dict['log_level']) - run_Process(args_dict) + args_dict: dict = processor.parse_and_check_args() + processor.set_log_level(args_dict['log_level']) + processor.run_Process(args_dict) except SystemExit as e: print("caught with code " + str(e.code)) - logger.info('run_process() exited') + processor.logger.info('run_process() exited') sys.exit(e.code) except: - logger.exception('Uncaught exception in run_Process:') - + processor.logger.exception('Uncaught exception in run_Process:') diff --git a/coordinator/ProcessorComponents.py b/coordinator/ProcessorComponents.py index f14d498..bea3f11 100644 --- a/coordinator/ProcessorComponents.py +++ b/coordinator/ProcessorComponents.py @@ -15,10 +15,10 @@ from ProcessorAdvisory import ( process_in_job_advisory ) -from ProcessorDeposition import ( - process_in_job_dep, - process_EWS_plotting_dep -) +#from ProcessorDeposition import ( +# process_in_job_dep, +# process_EWS_plotting_dep +#) from ProcessorEnvironment import ( process_in_job_env2_0, process_copy_past_job_env2_0, diff --git a/coordinator/ProcessorDeposition.py b/coordinator/ProcessorDeposition.py index 5ecac01..1adfa69 100644 --- a/coordinator/ProcessorDeposition.py +++ b/coordinator/ProcessorDeposition.py @@ -10,6 +10,8 @@ from string import Template import iris from iris.cube import CubeList +from Processor import Processor +from ProcessorServer import process_pre_job_server_download from ProcessorUtils import ( get_only_existing_globs, subprocess_and_log, @@ -18,142 +20,166 @@ from ProcessorUtils import ( from ews_postprocessing.deposition.deposition_post_processor import DepositionPostProcessor -logger = logging.getLogger('Processor.Deposition') -add_filters_to_sublogger(logger) +class ProcessorDeposition(Processor): -def process_in_job_dep(jobPath,status,config,component): - logger.info('started process_in_job_dep()') + def process_pre_job(self, args): + return process_pre_job_server_download(args) - file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) - file_name = Template(config[component]['InputFileTemplate']).substitute(**config) - logger.info(f"Expecting to work with {file_name}") + def process_in_job(self, jobPath, status, configjson, component) -> object: + return self.process_in_job_dep(jobPath, status, configjson, component) - if os.path.exists(f"{jobPath}/{file_name}"): - logger.info('Directory already exists in job directory, so nothing to do here') - return - logger.info('Copying file from remote server to job directory') + def process_post_job(self, jobPath, configjson): + return self.process_EWS_plotting_dep(jobPath, configjson) - # TODO: perform ssh file transfer in python instead of subprocess - server_name: str = config['ServerName'] - if server_name == "": - cmd_scp = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath] - else: - cmd_scp = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", - f"{server_name}:{file_path}/{file_name}.tar.gz", jobPath] - description_short = 'dep scp' - description_long = 'scp from server to job directory' - subprocess_and_log(cmd_scp, description_short, description_long) + def __init__(self) -> None: + super().__init__() + logger = logging.getLogger('Processor.Deposition') + add_filters_to_sublogger(logger) - logger.info('untarring the input file') - # TODO: untar file in python (with tarfile module) instead of subprocess - cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath] - description_short = 'dep tars' - description_long = 'untar the downloaded file' - subprocess_and_log(cmd_tar, description_short, description_long) + def process_in_job_dep(self, jobPath, status, config, component): + self.logger.info('started process_in_job_dep()') - # basic check that contents are as expected - # 132 files of NAME .txt timesteps and one summary png file - # if len(glob(f"{jobPath}/{file_name}/deposition_srcs_allregions_C1_T*.txt")) != 56: - # msg = f"Unexpect number of deposition .txt files in input tar file. Expected 56." - # logger.error(msg) - # raise RuntimeError(msg) + file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) + file_name = Template(config[component]['InputFileTemplate']).substitute(**config) - # basic check that contents are as expected (56 timepoints in the file) - cube_wildcard = f"{jobPath}/{file_name}/deposition_srcs_allregions*.nc" - cubes: CubeList = iris.load(cube_wildcard) - for cube in cubes: - coord = cube.coord("time") - timepoint_count = coord.shape[0] - if timepoint_count != 56: - msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}" - logger.error(msg) - raise RuntimeError(msg) + self.logger.info(f"Expecting to work with {file_name}") - proc_out = {} - # Output files available for upload - proc_out['output'] = None - # Processing files available for clearing - proc_out['clearup'] = [f"{jobPath}/{file_name}.tar.gz"] + if os.path.exists(f"{jobPath}/{file_name}"): + self.logger.info('Directory already exists in job directory, so nothing to do here') + return - return proc_out + self.logger.info('Copying file from remote server to job directory') -def process_EWS_plotting_dep(jobPath,config): - '''Returns a list of output files for transfer.''' + # TODO: perform ssh file transfer in python instead of subprocess + server_name: str = config['ServerName'] + if server_name == "": + cmd_scp = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath] + else: + cmd_scp = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", + f"{server_name}:{file_path}/{file_name}.tar.gz", jobPath] - logger.info('started process_EWS_plotting_dep()') + description_short = 'dep scp' + description_long = 'scp from server to job directory' + subprocess_and_log(cmd_scp, description_short, description_long) - # initialise environment - regions = config['SubRegionNames'] + self.logger.info('untarring the input file') - deposition_file_name = Template(config['Deposition']['InputFileTemplate']).substitute(**config) + # TODO: untar file in python (with tarfile module) instead of subprocess + cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath] + description_short = 'dep tars' + description_long = 'untar the downloaded file' + subprocess_and_log(cmd_tar, description_short, description_long) - deposition_path = f"{jobPath}/{deposition_file_name}" + # basic check that contents are as expected + # 132 files of NAME .txt timesteps and one summary png file + # if len(glob(f"{jobPath}/{file_name}/deposition_srcs_allregions_C1_T*.txt")) != 56: + # msg = f"Unexpect number of deposition .txt files in input tar file. Expected 56." + # self.logger.error(msg) + # raise RuntimeError(msg) - # get the file name from the config - # this file name can be a glob, as long as matches can all be loaded by iris - deposition_data_file_name = Template(config['Deposition']['DataFileTemplate']).substitute(**config) - name_file_wildcard = f"{deposition_path}/{deposition_data_file_name}" + # basic check that contents are as expected (56 timepoints in the file) + cube_wildcard = f"{jobPath}/{file_name}/deposition_srcs_allregions*.nc" + cubes: CubeList = iris.load(cube_wildcard) + for cube in cubes: + coord = cube.coord("time") + timepoint_count = coord.shape[0] + if timepoint_count != 56: + msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}" + self.logger.error(msg) + raise RuntimeError(msg) - EWSPlottingOutputGlobs = [] + proc_out = {} + # Output files available for upload + proc_out['output'] = None + # Processing files available for clearing + proc_out['clearup'] = [f"{jobPath}/{file_name}.tar.gz"] - for region in regions: + return proc_out - output_dir = f"{jobPath}/plotting/{region.lower()}" - Path(output_dir).mkdir(parents=True, exist_ok=True) + def process_EWS_plotting_dep(self, jobPath, config): + '''Returns a list of output files for transfer.''' - sys_config = config['Deposition']['EWS-Plotting']['SysConfig'] - name_extraction_config = config['Deposition']['EWS-Plotting']['NameExtractionConfig'] - run_config = config['Deposition']['EWS-Plotting']['RunConfig'] - run_config_norm = config['Deposition']['EWS-Plotting']['RunConfigNorm'] - chart_config = config['Deposition']['EWS-Plotting'][region]['ChartConfig'] - normalize = config['Deposition']['EWS-Plotting'][region]['Normalize'] - extraction_file_prefix = 'deposition_' + region.lower() + self.logger.info('started process_EWS_plotting_dep()') - # Note that this runs all disease types available + # initialise environment + regions = config['SubRegionNames'] - logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{name_extraction_config}\n{run_config}\n{run_config_norm}\n{chart_config}") + deposition_file_name = Template(config['Deposition']['InputFileTemplate']).substitute(**config) - depo_processor = DepositionPostProcessor() - depo_processor.set_param_config_files(sys_config_file_arg = sys_config, - depo_name_extraction_config_file_arg = name_extraction_config, - chart_config_file_arg = chart_config, - depo_plotting_run_config_file_arg = run_config, - depo_plotting_normalized_run_config_file_arg = run_config_norm, - name_file_wildcard_arg = name_file_wildcard, - wheat_sources_dir_arg = deposition_path, - output_dir_arg = output_dir, - issue_date_arg = config['StartString'], - extraction_file_prefix_arg = extraction_file_prefix) + deposition_path = f"{jobPath}/{deposition_file_name}" + # get the file name from the config + # this file name can be a glob, as long as matches can all be loaded by iris + deposition_data_file_name = Template(config['Deposition']['DataFileTemplate']).substitute(**config) + name_file_wildcard = f"{deposition_path}/{deposition_data_file_name}" - # asia/east africa env suit should not perform normalization, false gets passed here for these areas - depo_processor.name_extract_params.NORMALIZE = (normalize.upper() == "TRUE") + EWSPlottingOutputGlobs = [] - depo_processor.process() + for region in regions: - # check the output - EWSPlottingOutputDir = f"{output_dir}/images/" - #EWSPlottingOutputGlobs += [ - # # daily plots - # f"{EWSPlottingOutputDir}Daily/deposition_{region.lower()}_*_daily_20*.png", - # # weekly plots - # f"{EWSPlottingOutputDir}Weekly/deposition_{region.lower()}_*_total_20*.png"] + output_dir = f"{jobPath}/plotting/{region.lower()}" - EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"] + Path(output_dir).mkdir(parents=True, exist_ok=True) - EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) + sys_config = config['Deposition']['EWS-Plotting']['SysConfig'] + name_extraction_config = config['Deposition']['EWS-Plotting']['NameExtractionConfig'] + run_config = config['Deposition']['EWS-Plotting']['RunConfig'] + run_config_norm = config['Deposition']['EWS-Plotting']['RunConfigNorm'] + chart_config = config['Deposition']['EWS-Plotting'][region]['ChartConfig'] + normalize = config['Deposition']['EWS-Plotting'][region]['Normalize'] + extraction_file_prefix = 'deposition_' + region.lower() - # check there is some output from EWS-plotting - if not EWSPlottingOutputGlobs: - logger.error('EWS-Plotting did not produce any output') - raise RuntimeError + # Note that this runs all disease types available - # provide list for transfer - EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)]) + self.logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{name_extraction_config}\n{run_config}\n{run_config_norm}\n{chart_config}") - return EWSPlottingOutputs + depo_processor = DepositionPostProcessor() + depo_processor.set_param_config_files(sys_config_file_arg = sys_config, + depo_name_extraction_config_file_arg = name_extraction_config, + chart_config_file_arg = chart_config, + depo_plotting_run_config_file_arg = run_config, + depo_plotting_normalized_run_config_file_arg = run_config_norm, + name_file_wildcard_arg = name_file_wildcard, + wheat_sources_dir_arg = deposition_path, + output_dir_arg = output_dir, + issue_date_arg = config['StartString'], + extraction_file_prefix_arg = extraction_file_prefix) + + + # asia/east africa env suit should not perform normalization, false gets passed here for these areas + depo_processor.name_extract_params.NORMALIZE = (normalize.upper() == "TRUE") + + depo_processor.process() + + # check the output + EWSPlottingOutputDir = f"{output_dir}/images/" + #EWSPlottingOutputGlobs += [ + # # daily plots + # f"{EWSPlottingOutputDir}Daily/deposition_{region.lower()}_*_daily_20*.png", + # # weekly plots + # f"{EWSPlottingOutputDir}Weekly/deposition_{region.lower()}_*_total_20*.png"] + + EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"] + + EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) + + # check there is some output from EWS-plotting + if not EWSPlottingOutputGlobs: + self.logger.error('EWS-Plotting did not produce any output') + raise RuntimeError + + # provide list for transfer + EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)]) + + return EWSPlottingOutputs + + +if __name__ == '__main__': + processor = ProcessorDeposition() + args_dict: dict = processor.parse_and_check_args() + processor.run_Process(args_dict) -- GitLab