diff --git a/coordinator/ProcessorOnlyPlot.py b/coordinator/ProcessorOnlyPlot.py new file mode 100644 index 0000000000000000000000000000000000000000..6e034912252d5a5181d3d0ea8cb118dea2e84bf8 --- /dev/null +++ b/coordinator/ProcessorOnlyPlot.py @@ -0,0 +1,565 @@ +#Processor.py +'''To be used for handling any operational component of wheat rust early warning +system. + +Command-line options specify which component and which json configuration +files to run, and complete any remaining elements of the configuration files. + +Component varies by choices of process_pre_job(), process_in_job() and +process_EWS_plotting(). + +This runs all dates of a single forecast. + +Example usage:: + $ conda activate py3EWSepi + $ python Processor.py -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715 +or:: + $ ./run_Processor.sh -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715 +''' +from typing import List + + +print("Make sure to `conda activate py3EWSepi` environment!") +print("Make sure that flagdir package is available (on PYTHONPATH)") + +import argparse +import datetime +import json +import logging +import logging.config +import os +from pathlib import Path +import shutil +import sys + +# gitlab projects +from flagdir import jobStatus # created by jws52 + +# submodules of this project +import BufferingSMTPHandler +import ProcessorComponents +from ProcessorUtils import ( + append_item_to_list, + clear_up, + endScript, + endJob, + open_and_check_config, + PasswordODKFilter, + short_name +) + +# initialise default values for configuration + +script_name = 'Processor' + +timeNow = datetime.datetime.today() +dateToday = timeNow.date() +todayString = timeNow.strftime('%Y%m%d') +nowString = timeNow.strftime('%Y%m%d-%H%M-%S') + +# get the path to this script +script_path = os.path.dirname(__file__)+'/' + +coordinator_path = script_path + +# log file for all jobs +log_path_project = f"{coordinator_path}logs/log.txt" + +# job-specific log file will be written here until a job directory exits, when it will be moved there +log_path_default = f"{coordinator_path}logs/log_{nowString}.txt" + +# get the email credentials file path from the environment variables +assert 'EMAIL_CRED' in os.environ +email_credential_fn = os.environ['EMAIL_CRED'] +assert os.path.exists(email_credential_fn) + +with open(email_credential_fn,'r') as f: + gmail_config = json.load(f) + +# check contents +required_keys = ['user','pass','host','port','toaddrs'] +for required_key in required_keys: + assert required_key in gmail_config + +# load config from python dictionary (could be loaded from json file) +# TODO: smtp handler can only use tls, but ssl is more secure. Look into defining/writing a suitable smtp handler +logConfigDict = { + 'version' : 1, + 'disable_existing_loggers': False, + 'formatters' : { + 'simple' : { + 'format' : '%(name)s : %(levelname)s - %(message)s' + }, + 'detailed' : { + 'format' : f""" + For command: + {' '.join(sys.argv)} + + %(levelname)s in %(name)s encountered at %(asctime)s: + + %(message)s + + Resolve this error and restart processing. + + """, + 'datefmt' : '%Y-%m-%d %H:%M:%S' + } + }, + 'filters' : { + 'mask_passwords' : { + '()' : PasswordODKFilter + } + }, + 'handlers' : { + + # logging for project + 'handler_rot_file' : { + 'class' : 'logging.handlers.TimedRotatingFileHandler', + 'level' : 'INFO', + 'formatter' : 'simple', + 'filters' : ['mask_passwords'], + 'filename' : log_path_project, + # time of day on given day + 'when' : 'W2', # rotate file every Wednesday + 'atTime' : datetime.time(1,0,0), # at 1am local time + 'backupCount' : 12, + }, + + # logging for job + 'handler_file' : { + 'class' : 'logging.FileHandler', + 'level' : 'INFO', + 'formatter' : 'simple', + 'filters' : ['mask_passwords'], + 'filename' : log_path_default, + 'mode' : 'a', # 'a' for append + }, + # to email errors to maintainers + 'handler_buffered_email': { + 'class': 'BufferingSMTPHandler.BufferingSMTPHandler', + 'level': 'ERROR', + 'server': (gmail_config['host'], gmail_config['port']), # host, port. 465 fsor SSL, 587 for tls + 'credentials': (gmail_config['user'], gmail_config['pass']), + 'fromaddr': gmail_config['user'], + 'toaddrs': gmail_config['toaddrs'], + 'subject': 'ERROR in EWS Processor', + 'formatter': 'detailed', + 'filters': ['mask_passwords'], + 'capacity': 100 + }, + }, + 'loggers' : { + # this is activated when this script is imported + # i.e. with logging.getLogger('Process.') + script_name : { + 'level' : 'INFO', + 'handlers' : ['handler_rot_file','handler_file','handler_buffered_email'], + 'propagate' : True, + }, + # this is activated when this script is called on the command line + # or from a bash script + # i.e. with logging.getLogger(__name__) when name == '__main__' + '__main__' : { + 'level' : 'INFO', + 'handlers' : ['handler_rot_file','handler_file','handler_buffered_email'], + 'propagate' : True, + } + } +} + +logging.config.dictConfig(logConfigDict) + +print(__name__) +# create a logger named according to how the file is called +#logger = logging.getLogger(__name__) +logger = logging.getLogger(script_name) + +loglevels = {'debug':logging.DEBUG, + 'info':logging.INFO, + 'warning':logging.WARNING, + 'error':logging.ERROR, + 'critical':logging.CRITICAL, + } + +def move_default_logfile_handler(dstPathName,srcPathName=log_path_default,FileHandlerName='handler_file',): + '''For on-the-fly move of logging from default file to another. Copies the + contents of the source log file to destination, switches file handler in + logger, then removes source log file.''' + + logger.info(f"Moving logfile location from:\n{srcPathName}\nto:\n{dstPathName}") + + # copy old log file to new filename + srcPath = Path(srcPathName) + dstPath = Path(dstPathName) + assert srcPath.exists() + assert dstPath.parent.is_dir() + + oldFileHandler = [h for h in logger.handlers if h.name==FileHandlerName][0] + oldFormatter = oldFileHandler.formatter + + # define new file handler + newfilehandler = logging.FileHandler(dstPath,mode=oldFileHandler.mode) + newfilehandler.setLevel(oldFileHandler.level) + newfilehandler.setFormatter(oldFormatter) + + shutil.copyfile(srcPath,dstPath) + + # add handler for destination file + logger.info('Adding new logging file handler to destination path') + + logger.addHandler(newfilehandler) + + # remove handler for source file + logger.info('Stopping write to old file handler') + logger.removeHandler(oldFileHandler) + oldFileHandler.close() + logger.info('Successfully stopped write to old file handler') + + # delete old log file + logger.info('Deleting old log file, since all content available in new log file stream') + os.rename(srcPathName,srcPathName+'removed') + + return + + +def parse_and_check_args(todayString) -> dict: + + # define the command line arguments + my_parser = argparse.ArgumentParser(description='Command-line arguments for coordinator script of env suitability model') + + # Add the arguments + # positional arguments do not start with - or -- and are always required + # optional arguments start with - or -- and default is required = False + + my_parser.add_argument( + '-p', '--component', + type = str, + choices = list(short_name.keys()), + required = True, + dest = 'component', + help = '''Name of EWS component to process, which must be present + in the config file.''') + + my_parser.add_argument( + '-c', '--config', + metavar = 'path', + type = str, + nargs = '+', # require at least one path + dest = 'config_paths', + required = True, + #default = ['config_Ethiopia_template_stripe.json'], # remove once live + #default = ['config_Bangladesh_template_stripe.json'], # remove once live + #default = ['config_Nepal_template_stripe.json'], # remove once live + help = '''path to a config file(s). More than one can be provided, + in which case each is worked on in turn (e.g. one each for stripe, stem, leaf). + Do not place other options between these.''') + + my_parser.add_argument( + '-l','--loglevel', + action = 'store', + choices = list(loglevels.keys()), + default = 'info', + help = 'verbosity of log messaging (debug, info, warning, error, critical)\n default is debug', + dest = 'log_level', # this names the attribute that will be parsed + ) + + my_parser.add_argument( + '--islive', + action = 'store_true', + help = 'If live, email messages are sent to maintainers for warning and errors', + dest = 'live', + ) + + my_parser.add_argument( + '-s','--start-date','-i','--initial-date', + metavar = 'YYYYMMDD', + action = 'store', + default = todayString, + help = 'Initial day of calculation, starting at 00 UTC (Default is today)', + dest = 'start_date', + ) + + my_parser.add_argument( + '--noupload', + action = 'store_true', + help = 'whether results of script should be saved to willow public directory' + ) + + my_parser.add_argument( + '--clearup', + action = 'store_true', + help = 'whether to delete mid-process files at the end of a successful job', + dest = 'clearup', + ) + + # get an object holding all of the args + args = my_parser.parse_args() + + # Check the args + + logger.info(f"Command-line options are:\n{args}") + + if not isinstance(args.config_paths,list): + logger.error('Expecting a list of config paths') + raise RuntimeError + + # check the startstring + if args.start_date is not todayString: + try: + # check date string is formatted correctly + provided_start_date = datetime.datetime.strptime(args.start_date,'%Y%m%d') + today_date = datetime.datetime.strptime(todayString,'%Y%m%d') + + # early limit is quite arbitrary, but this is earliest year of available survey data for Ethiopia + date_limit_early = datetime.datetime.strptime('20070101','%Y%m%d') + assert date_limit_early < provided_start_date + assert provided_start_date <= today_date + + except (ValueError, AssertionError) as e: + logger.exception("Provided start date string is formatted incorrectly or out of range, or end date not also defined") + raise + + dictionary: dict = vars(args) + return dictionary + + +def set_log_level(log_level: str): + new_log_level = loglevels[log_level] + + # modify log level of all loggers + logger.info(f"logging level being changed to {new_log_level} because of command-line option") + loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] + for logger_i in loggers: logger_i.setLevel(new_log_level) + + +def build_universal_config(configs: list,component: str, universal_config=None): + '''This config obtains aspects of each dict in configs that must be common to + them all. ''' + + # initialise universal config + if not universal_config: + universal_config = { + 'WorkspacePathout' : set(), + 'ProcessPreJob' : set(), + 'ProcessInJob' : set(), + 'ProcessEWSPlotting' : set(), + 'ServerPath' : set(), + 'ServerName' : set(), + 'ServerKey' : set()} + + keys = universal_config.keys() + + # get value of each key from each config file + for configFile in configs: + + try: + + config_i = open_and_check_config(configFile) + + except: + + logger.exception(f"Failure in opening or checking config {configFile}") + endScript(premature=True) + + for key in keys: + + try: + universal_config[key].add(config_i[key]) + + except KeyError: + + # key must be in component sub-dict + universal_config[key].add(config_i[component][key]) + + # Check for and keep only one value per key + for key in keys: + + if len(universal_config[key]) > 1: + + logger.error(f"Config files point to multiple {key} but this script can only handle one.") + endScript(premature=True) + + universal_config[key] = universal_config[key].pop() + + return universal_config + +def run_Process(args: dict): + + # check initial state of each config file, and gather terms that must apply + # across all provided configs + + if not args["live"]: + # remove the log handler that would send emails + logger.handlers = [h for h in logger.handlers if not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] + + config_paths: List[str] = args['config_paths'] + component: str = args['component'] + start_date: str = args['start_date'] + noupload: bool = args['noupload'] + clearup: bool = args['clearup'] + + universal_config = build_universal_config(config_paths, component) + + universal_config['StartString'] = start_date + + logger.info(f"Universal config is\n{json.dumps(universal_config,indent=2)}") + + workspacePath = universal_config['WorkspacePathout'] + + process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting']) + + # determine job directory + jobPath = f'{workspacePath}{short_name[component]}_{start_date}' + + logger.info(f"Job path will be {jobPath}") + + # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC + startTime = datetime.datetime.strptime(start_date+'03','%Y%m%d%H') + + # create job directory + Path(jobPath).mkdir(parents=True, exist_ok=True) + + # lock job directory + with jobStatus(jobPath) as status: + + #lawrence comment in/out + # check for a status file in job directory + if status.had_initial_status: + logger.info(f"Job path already exists and has status {status.status}") + + endScript(premature = status.status not in ['SUCCESS','INPROGRESS']) + + logger.info(f"Current status of job directory is {status.status}") + + # now that we have a useable job directory, move the log file there + logPathJob = f"{jobPath}/log.txt" + + move_default_logfile_handler(dstPathName=logPathJob) + + # files and directories that will be uploaded to public server + FilesToSend = [] + + # files and directories that will be earmarked for removal after a + # successful job + paths_to_clear = [] + + logger.info('Starting to work on each configuration') + + for configIndex, configtemplate in enumerate(config_paths): + + config_paths_length: int = len(config_paths) + logger.info(f'Working on config {configIndex+1} of {config_paths_length}') + + try: + configjson = open_and_check_config(configtemplate) + except: + logger.exception(f"Failure in opening or checking config {configtemplate}") + # TODO: This case should test flagdir.jobStatus.__exit__() + raise # endJob('ERROR',premature=True) + + # provide specific case details to template config + + configjson['StartTime'] = startTime.strftime('%Y-%m-%d-%H%M') + configjson['StartString'] = start_date + + # from configtemplate create configFileName to describe the specific job + component: str = component + configFileName = f"{os.path.basename(configtemplate).replace('.json','')}_{component}" + + configjson['ConfigFilePath'] = configFileName + + # write the complete configuration file to job directory + with open(f"{jobPath}/{configFileName}.json",'w') as write_file: + json.dump(configjson,write_file,indent=4) + + # Run EWS-plotting command + + proc_description = universal_config['ProcessEWSPlotting'] + try: + EWSPlottingOutputs = process_EWS_plotting(jobPath,configjson) + except: + logger.exception(f"Error in {proc_description}()") + status.reset('ERROR') + endJob(status,premature=True) + + logger.info('Finished with EWS-Plotting, appending images to list for transfer') + + if EWSPlottingOutputs: + + append_item_to_list( + EWSPlottingOutputs, + FilesToSend, + proc_description, + status) + + logger.info(f'Finished with config {configIndex+1} of {config_paths_length}') + + # send results to remote server + + if not noupload: + try: + ProcessorComponents.upload(universal_config, FilesToSend, component) + except IndexError: + status.reset('WARNING') + + except: + logger.exception('Failed to upload files to remote server') + status.reset('ERROR') + endJob(status,premature=True) + + # check if there is a second location on willow to provide results + if 'ServerPathExtra' in configjson[component]: + + logger.info('There is an extra path to send results to:') + + extra_path = configjson[component]['ServerPathExtra'] + + logger.info(extra_path) + + universal_config_extra = universal_config.copy() + universal_config_extra['ServerPath'] = extra_path + + try: + ProcessorComponents.upload(universal_config_extra, FilesToSend, component) + except IndexError: + status.reset('WARNING') + + except: + logger.exception('Failed to upload files to extra directory on remote server') + status.reset('ERROR') + endJob(status,premature=True) + + else: + logger.info('Because noupload argument was present, not sending results to remote server') + + status.reset('SUCCESS') + + if status.is_success() & (clearup is True): + + logger.info('Clearing up') + + clearup_dest_dir = f"{workspacePath}/clearup/{short_name[component]}_{start_date}/" + Path(clearup_dest_dir).mkdir(parents=True, exist_ok=True) + + logger.info(f"While developing, moving directories to this directory : {clearup_dest_dir}") + + clear_up( paths_to_clear, clearup_dest = clearup_dest_dir) + + endScript(premature=False) + + +if __name__ == '__main__': + try: + logger.info("==========") + logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") + # load configurations + args_dict: dict = parse_and_check_args(todayString) + set_log_level(args_dict['log_level']) + run_Process(args_dict) + except SystemExit: + + logger.info('run_process() exited') + pass + except: + logger.exception('Uncaught exception in run_Process:') + diff --git a/coordinator/ProcessorSurveysWRT.py b/coordinator/ProcessorSurveysWRT.py index d2a50c2ddeead781ea13cf8295cebb5a65b713bd..be12633ad4bfe20b7021e7918239e3279e4971c4 100644 --- a/coordinator/ProcessorSurveysWRT.py +++ b/coordinator/ProcessorSurveysWRT.py @@ -179,7 +179,6 @@ def nested_to_flattened(df): row[nested_row[rr]['DiseaseName'] + '.Incident'] = nested_row[rr]['IncidenceCategory'] row[nested_row[rr]['DiseaseName'] + '.Severity'] = nested_row[rr]['SeverityCategory'] - nested_row[rr]['listResult'] = [{'Race': 'Alma', 'Genotype': 'Korte'},{'Race': 'Banan', 'Genotype': 'Malna'}] # !!!!!!!!! DELETE THIS LINE !!!!!!!!!! for i in range(len(nested_row[rr]['listResult'])): # TODO: check if the separation symbol is in the string or not row[nested_row[rr]['DiseaseName'] + '.Race'] += nested_row[rr]['listResult'][i]['Race']