#Processor.py '''To be used for handling any operational component of wheat rust early warning system. Command-line options specify which component and which json configuration files to run, and complete any remaining elements of the configuration files. Component varies by choices of process_pre_job(), process_in_job() and process_EWS_plotting(). This runs all dates of a single forecast. Example usage:: $ conda activate py3EWSepi $ python Processor.py -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715 or:: $ ./run_Processor.sh -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715 ''' from typing import List print("Make sure to `conda activate py3EWSepi` environment!") print("Make sure that flagdir package is available (on PYTHONPATH)") import argparse import datetime import json import logging import logging.config import os from pathlib import Path import shutil import sys # gitlab projects from flagdir import jobStatus # created by jws52 # submodules of this project import BufferingSMTPHandler import ProcessorComponents from ProcessorUtils import ( append_item_to_list, clear_up, endScript, endJob, open_and_check_config, PasswordODKFilter, short_name ) # initialise default values for configuration script_name = 'Processor' timeNow = datetime.datetime.today() dateToday = timeNow.date() todayString = timeNow.strftime('%Y%m%d') nowString = timeNow.strftime('%Y%m%d-%H%M-%S') # get the path to this script script_path = os.path.dirname(__file__)+'/' coordinator_path = script_path # log file for all jobs log_path_project = f"{coordinator_path}logs/log.txt" # job-specific log file will be written here until a job directory exits, when it will be moved there log_path_default = f"{coordinator_path}logs/log_{nowString}.txt" # get the email credentials file path from the environment variables assert 'EMAIL_CRED' in os.environ email_credential_fn = os.environ['EMAIL_CRED'] assert os.path.exists(email_credential_fn) with open(email_credential_fn,'r') as f: gmail_config = json.load(f) # check contents required_keys = ['user','pass','host','port','toaddrs'] for required_key in required_keys: assert required_key in gmail_config # load config from python dictionary (could be loaded from json file) # TODO: smtp handler can only use tls, but ssl is more secure. Look into defining/writing a suitable smtp handler logConfigDict = { 'version' : 1, 'disable_existing_loggers': False, 'formatters' : { 'simple' : { 'format' : '%(name)s : %(levelname)s - %(message)s' }, 'detailed' : { 'format' : f""" For command: {' '.join(sys.argv)} %(levelname)s in %(name)s encountered at %(asctime)s: %(message)s Resolve this error and restart processing. """, 'datefmt' : '%Y-%m-%d %H:%M:%S' } }, 'filters' : { 'mask_passwords' : { '()' : PasswordODKFilter } }, 'handlers' : { # logging for project 'handler_rot_file' : { 'class' : 'logging.handlers.TimedRotatingFileHandler', 'level' : 'INFO', 'formatter' : 'simple', 'filters' : ['mask_passwords'], 'filename' : log_path_project, # time of day on given day 'when' : 'W2', # rotate file every Wednesday 'atTime' : datetime.time(1,0,0), # at 1am local time 'backupCount' : 12, }, # logging for job 'handler_file' : { 'class' : 'logging.FileHandler', 'level' : 'INFO', 'formatter' : 'simple', 'filters' : ['mask_passwords'], 'filename' : log_path_default, 'mode' : 'a', # 'a' for append }, # to email errors to maintainers 'handler_buffered_email': { 'class': 'BufferingSMTPHandler.BufferingSMTPHandler', 'level': 'ERROR', 'server': (gmail_config['host'], gmail_config['port']), # host, port. 465 fsor SSL, 587 for tls 'credentials': (gmail_config['user'], gmail_config['pass']), 'fromaddr': gmail_config['user'], 'toaddrs': gmail_config['toaddrs'], 'subject': 'ERROR in EWS Processor', 'formatter': 'detailed', 'filters': ['mask_passwords'], 'capacity': 100 }, }, 'loggers' : { # this is activated when this script is imported # i.e. with logging.getLogger('Process.') script_name : { 'level' : 'INFO', 'handlers' : ['handler_rot_file','handler_file','handler_buffered_email'], 'propagate' : True, }, # this is activated when this script is called on the command line # or from a bash script # i.e. with logging.getLogger(__name__) when name == '__main__' '__main__' : { 'level' : 'INFO', 'handlers' : ['handler_rot_file','handler_file','handler_buffered_email'], 'propagate' : True, } } } logging.config.dictConfig(logConfigDict) print(__name__) # create a logger named according to how the file is called #logger = logging.getLogger(__name__) logger = logging.getLogger(script_name) loglevels = {'debug':logging.DEBUG, 'info':logging.INFO, 'warning':logging.WARNING, 'error':logging.ERROR, 'critical':logging.CRITICAL, } def move_default_logfile_handler(dstPathName,srcPathName=log_path_default,FileHandlerName='handler_file',): '''For on-the-fly move of logging from default file to another. Copies the contents of the source log file to destination, switches file handler in logger, then removes source log file.''' logger.info(f"Moving logfile location from:\n{srcPathName}\nto:\n{dstPathName}") # copy old log file to new filename srcPath = Path(srcPathName) dstPath = Path(dstPathName) assert srcPath.exists() assert dstPath.parent.is_dir() oldFileHandler = [h for h in logger.handlers if h.name==FileHandlerName][0] oldFormatter = oldFileHandler.formatter # define new file handler newfilehandler = logging.FileHandler(dstPath,mode=oldFileHandler.mode) newfilehandler.setLevel(oldFileHandler.level) newfilehandler.setFormatter(oldFormatter) shutil.copyfile(srcPath,dstPath) # add handler for destination file logger.info('Adding new logging file handler to destination path') logger.addHandler(newfilehandler) # remove handler for source file logger.info('Stopping write to old file handler') logger.removeHandler(oldFileHandler) oldFileHandler.close() logger.info('Successfully stopped write to old file handler') # delete old log file logger.info('Deleting old log file, since all content available in new log file stream') os.rename(srcPathName,srcPathName+'removed') return def parse_and_check_args(todayString) -> dict: # define the command line arguments my_parser = argparse.ArgumentParser(description='Command-line arguments for coordinator script of env suitability model') # Add the arguments # positional arguments do not start with - or -- and are always required # optional arguments start with - or -- and default is required = False my_parser.add_argument( '-p', '--component', type = str, choices = list(short_name.keys()), required = True, dest = 'component', help = '''Name of EWS component to process, which must be present in the config file.''') my_parser.add_argument( '-c', '--config', metavar = 'path', type = str, nargs = '+', # require at least one path dest = 'config_paths', required = True, #default = ['config_Ethiopia_template_stripe.json'], # remove once live #default = ['config_Bangladesh_template_stripe.json'], # remove once live #default = ['config_Nepal_template_stripe.json'], # remove once live help = '''path to a config file(s). More than one can be provided, in which case each is worked on in turn (e.g. one each for stripe, stem, leaf). Do not place other options between these.''') my_parser.add_argument( '-l','--loglevel', action = 'store', choices = list(loglevels.keys()), default = 'info', help = 'verbosity of log messaging (debug, info, warning, error, critical)\n default is debug', dest = 'log_level', # this names the attribute that will be parsed ) my_parser.add_argument( '--islive', action = 'store_true', help = 'If live, email messages are sent to maintainers for warning and errors', dest = 'live', ) my_parser.add_argument( '-s','--start-date','-i','--initial-date', metavar = 'YYYYMMDD', action = 'store', default = todayString, help = 'Initial day of calculation, starting at 00 UTC (Default is today)', dest = 'start_date', ) my_parser.add_argument( '--noupload', action = 'store_true', help = 'whether results of script should be saved to willow public directory' ) my_parser.add_argument( '--clearup', action = 'store_true', help = 'whether to delete mid-process files at the end of a successful job', dest = 'clearup', ) # get an object holding all of the args args = my_parser.parse_args() # Check the args logger.info(f"Command-line options are:\n{args}") if not isinstance(args.config_paths,list): logger.error('Expecting a list of config paths') raise RuntimeError # check the startstring if args.start_date is not todayString: try: # check date string is formatted correctly provided_start_date = datetime.datetime.strptime(args.start_date,'%Y%m%d') today_date = datetime.datetime.strptime(todayString,'%Y%m%d') # early limit is quite arbitrary, but this is earliest year of available survey data for Ethiopia date_limit_early = datetime.datetime.strptime('20070101','%Y%m%d') assert date_limit_early < provided_start_date assert provided_start_date <= today_date except (ValueError, AssertionError) as e: logger.exception("Provided start date string is formatted incorrectly or out of range, or end date not also defined") raise dictionary: dict = vars(args) return dictionary def set_log_level(log_level: str): new_log_level = loglevels[log_level] # modify log level of all loggers logger.info(f"logging level being changed to {new_log_level} because of command-line option") loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] for logger_i in loggers: logger_i.setLevel(new_log_level) def build_universal_config(configs: list,component: str, universal_config=None): '''This config obtains aspects of each dict in configs that must be common to them all. ''' # initialise universal config if not universal_config: universal_config = { 'WorkspacePathout' : set(), 'ProcessPreJob' : set(), 'ProcessInJob' : set(), 'ProcessEWSPlotting' : set(), 'ServerPath' : set(), 'ServerName' : set(), 'ServerKey' : set()} keys = universal_config.keys() # get value of each key from each config file for configFile in configs: try: config_i = open_and_check_config(configFile) except: logger.exception(f"Failure in opening or checking config {configFile}") endScript(premature=True) for key in keys: try: universal_config[key].add(config_i[key]) except KeyError: # key must be in component sub-dict universal_config[key].add(config_i[component][key]) # Check for and keep only one value per key for key in keys: if len(universal_config[key]) > 1: logger.error(f"Config files point to multiple {key} but this script can only handle one.") endScript(premature=True) universal_config[key] = universal_config[key].pop() return universal_config def run_Process(args: dict): # check initial state of each config file, and gather terms that must apply # across all provided configs if not args["live"]: # remove the log handler that would send emails logger.handlers = [h for h in logger.handlers if not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] config_paths: List[str] = args['config_paths'] component: str = args['component'] start_date: str = args['start_date'] noupload: bool = args['noupload'] clearup: bool = args['clearup'] universal_config = build_universal_config(config_paths, component) universal_config['StartString'] = start_date logger.info(f"Universal config is\n{json.dumps(universal_config,indent=2)}") workspacePath = universal_config['WorkspacePathout'] process_pre_job = getattr(ProcessorComponents, universal_config['ProcessPreJob']) process_in_job = getattr(ProcessorComponents, universal_config['ProcessInJob']) process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting']) # determine job directory jobPath = f'{workspacePath}{short_name[component]}_{start_date}' logger.info(f"Job path will be {jobPath}") # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC startTime = datetime.datetime.strptime(start_date+'03','%Y%m%d%H') # run any checks before creating a job directory # if this fails, then make a note once there is a job directory ready = process_pre_job(args) if not ready: logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") # create job directory Path(jobPath).mkdir(parents=True, exist_ok=True) # lock job directory with jobStatus(jobPath) as status: #lawrence comment in/out # check for a status file in job directory if status.had_initial_status: logger.info(f"Job path already exists and has status {status.status}") endScript(premature = status.status not in ['SUCCESS','INPROGRESS']) logger.info(f"Current status of job directory is {status.status}") # now that we have a useable job directory, move the log file there logPathJob = f"{jobPath}/log.txt" move_default_logfile_handler(dstPathName=logPathJob) # make a record if process_pre_job failed if not ready: logger.error(f"Process_pre_job raised an error so making a record in the job file. For details, see earlier warnings log") status.reset('ERROR') endJob(status,ignore_inprogress=True,premature=False) # files and directories that will be uploaded to public server FilesToSend = [] # files and directories that will be earmarked for removal after a # successful job paths_to_clear = [] logger.info('Starting to work on each configuration') for configIndex, configtemplate in enumerate(config_paths): config_paths_length: int = len(config_paths) logger.info(f'Working on config {configIndex+1} of {config_paths_length}') try: configjson = open_and_check_config(configtemplate) except: logger.exception(f"Failure in opening or checking config {configtemplate}") # TODO: This case should test flagdir.jobStatus.__exit__() raise # endJob('ERROR',premature=True) # provide specific case details to template config configjson['StartTime'] = startTime.strftime('%Y-%m-%d-%H%M') configjson['StartString'] = start_date # from configtemplate create configFileName to describe the specific job component: str = component configFileName = f"{os.path.basename(configtemplate).replace('.json','')}_{component}" configjson['ConfigFilePath'] = configFileName # write the complete configuration file to job directory with open(f"{jobPath}/{configFileName}.json",'w') as write_file: json.dump(configjson,write_file,indent=4) proc_description = universal_config['ProcessInJob'] try: proc_out = process_in_job(jobPath,status,configjson,component) except: logger.exception(f"Some error in {proc_description}()") status.reset('ERROR') endJob(status,premature=True) # Set default case # This would be improved by implementing a class structure if proc_out is None: proc_out = { 'output' : None, 'clearup' : None} if 'output' in proc_out.keys(): append_item_to_list( proc_out['output'], FilesToSend, proc_description, status) if 'clearup' in proc_out.keys(): append_item_to_list( proc_out['clearup'], paths_to_clear, proc_description, status) # Run EWS-plotting command proc_description = universal_config['ProcessEWSPlotting'] try: EWSPlottingOutputs = process_EWS_plotting(jobPath,configjson) except: logger.exception(f"Error in {proc_description}()") status.reset('ERROR') endJob(status,premature=True) logger.info('Finished with EWS-Plotting, appending images to list for transfer') if EWSPlottingOutputs: append_item_to_list( EWSPlottingOutputs, FilesToSend, proc_description, status) logger.info(f'Finished with config {configIndex+1} of {config_paths_length}') # send results to remote server if not noupload: try: ProcessorComponents.upload(universal_config, FilesToSend, component) except IndexError: status.reset('WARNING') except: logger.exception('Failed to upload files to remote server') status.reset('ERROR') endJob(status,premature=True) # check if there is a second location on willow to provide results if 'ServerPathExtra' in configjson[component]: logger.info('There is an extra path to send results to:') extra_path = configjson[component]['ServerPathExtra'] logger.info(extra_path) universal_config_extra = universal_config.copy() universal_config_extra['ServerPath'] = extra_path try: ProcessorComponents.upload(universal_config_extra, FilesToSend, component) except IndexError: status.reset('WARNING') except: logger.exception('Failed to upload files to extra directory on remote server') status.reset('ERROR') endJob(status,premature=True) else: logger.info('Because noupload argument was present, not sending results to remote server') status.reset('SUCCESS') if status.is_success() & (clearup is True): logger.info('Clearing up') clearup_dest_dir = f"{workspacePath}/clearup/{short_name[component]}_{start_date}/" Path(clearup_dest_dir).mkdir(parents=True, exist_ok=True) logger.info(f"While developing, moving directories to this directory : {clearup_dest_dir}") clear_up( paths_to_clear, clearup_dest = clearup_dest_dir) endScript(premature=False) if __name__ == '__main__': try: logger.info("==========") logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") # load configurations args_dict: dict = parse_and_check_args(todayString) set_log_level(args_dict['log_level']) run_Process(args_dict) except SystemExit: logger.info('run_process() exited') pass except: logger.exception('Uncaught exception in run_Process:')