diff --git a/coordinator/ProcessorOnlyPlot.py b/coordinator/ProcessorOnlyPlot.py deleted file mode 100644 index 6e034912252d5a5181d3d0ea8cb118dea2e84bf8..0000000000000000000000000000000000000000 --- a/coordinator/ProcessorOnlyPlot.py +++ /dev/null @@ -1,565 +0,0 @@ -#Processor.py -'''To be used for handling any operational component of wheat rust early warning -system. - -Command-line options specify which component and which json configuration -files to run, and complete any remaining elements of the configuration files. - -Component varies by choices of process_pre_job(), process_in_job() and -process_EWS_plotting(). - -This runs all dates of a single forecast. - -Example usage:: - $ conda activate py3EWSepi - $ python Processor.py -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715 -or:: - $ ./run_Processor.sh -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715 -''' -from typing import List - - -print("Make sure to `conda activate py3EWSepi` environment!") -print("Make sure that flagdir package is available (on PYTHONPATH)") - -import argparse -import datetime -import json -import logging -import logging.config -import os -from pathlib import Path -import shutil -import sys - -# gitlab projects -from flagdir import jobStatus # created by jws52 - -# submodules of this project -import BufferingSMTPHandler -import ProcessorComponents -from ProcessorUtils import ( - append_item_to_list, - clear_up, - endScript, - endJob, - open_and_check_config, - PasswordODKFilter, - short_name -) - -# initialise default values for configuration - -script_name = 'Processor' - -timeNow = datetime.datetime.today() -dateToday = timeNow.date() -todayString = timeNow.strftime('%Y%m%d') -nowString = timeNow.strftime('%Y%m%d-%H%M-%S') - -# get the path to this script -script_path = os.path.dirname(__file__)+'/' - -coordinator_path = script_path - -# log file for all jobs -log_path_project = f"{coordinator_path}logs/log.txt" - -# job-specific log file will be written here until a job directory exits, when it will be moved there -log_path_default = f"{coordinator_path}logs/log_{nowString}.txt" - -# get the email credentials file path from the environment variables -assert 'EMAIL_CRED' in os.environ -email_credential_fn = os.environ['EMAIL_CRED'] -assert os.path.exists(email_credential_fn) - -with open(email_credential_fn,'r') as f: - gmail_config = json.load(f) - -# check contents -required_keys = ['user','pass','host','port','toaddrs'] -for required_key in required_keys: - assert required_key in gmail_config - -# load config from python dictionary (could be loaded from json file) -# TODO: smtp handler can only use tls, but ssl is more secure. Look into defining/writing a suitable smtp handler -logConfigDict = { - 'version' : 1, - 'disable_existing_loggers': False, - 'formatters' : { - 'simple' : { - 'format' : '%(name)s : %(levelname)s - %(message)s' - }, - 'detailed' : { - 'format' : f""" - For command: - {' '.join(sys.argv)} - - %(levelname)s in %(name)s encountered at %(asctime)s: - - %(message)s - - Resolve this error and restart processing. - - """, - 'datefmt' : '%Y-%m-%d %H:%M:%S' - } - }, - 'filters' : { - 'mask_passwords' : { - '()' : PasswordODKFilter - } - }, - 'handlers' : { - - # logging for project - 'handler_rot_file' : { - 'class' : 'logging.handlers.TimedRotatingFileHandler', - 'level' : 'INFO', - 'formatter' : 'simple', - 'filters' : ['mask_passwords'], - 'filename' : log_path_project, - # time of day on given day - 'when' : 'W2', # rotate file every Wednesday - 'atTime' : datetime.time(1,0,0), # at 1am local time - 'backupCount' : 12, - }, - - # logging for job - 'handler_file' : { - 'class' : 'logging.FileHandler', - 'level' : 'INFO', - 'formatter' : 'simple', - 'filters' : ['mask_passwords'], - 'filename' : log_path_default, - 'mode' : 'a', # 'a' for append - }, - # to email errors to maintainers - 'handler_buffered_email': { - 'class': 'BufferingSMTPHandler.BufferingSMTPHandler', - 'level': 'ERROR', - 'server': (gmail_config['host'], gmail_config['port']), # host, port. 465 fsor SSL, 587 for tls - 'credentials': (gmail_config['user'], gmail_config['pass']), - 'fromaddr': gmail_config['user'], - 'toaddrs': gmail_config['toaddrs'], - 'subject': 'ERROR in EWS Processor', - 'formatter': 'detailed', - 'filters': ['mask_passwords'], - 'capacity': 100 - }, - }, - 'loggers' : { - # this is activated when this script is imported - # i.e. with logging.getLogger('Process.') - script_name : { - 'level' : 'INFO', - 'handlers' : ['handler_rot_file','handler_file','handler_buffered_email'], - 'propagate' : True, - }, - # this is activated when this script is called on the command line - # or from a bash script - # i.e. with logging.getLogger(__name__) when name == '__main__' - '__main__' : { - 'level' : 'INFO', - 'handlers' : ['handler_rot_file','handler_file','handler_buffered_email'], - 'propagate' : True, - } - } -} - -logging.config.dictConfig(logConfigDict) - -print(__name__) -# create a logger named according to how the file is called -#logger = logging.getLogger(__name__) -logger = logging.getLogger(script_name) - -loglevels = {'debug':logging.DEBUG, - 'info':logging.INFO, - 'warning':logging.WARNING, - 'error':logging.ERROR, - 'critical':logging.CRITICAL, - } - -def move_default_logfile_handler(dstPathName,srcPathName=log_path_default,FileHandlerName='handler_file',): - '''For on-the-fly move of logging from default file to another. Copies the - contents of the source log file to destination, switches file handler in - logger, then removes source log file.''' - - logger.info(f"Moving logfile location from:\n{srcPathName}\nto:\n{dstPathName}") - - # copy old log file to new filename - srcPath = Path(srcPathName) - dstPath = Path(dstPathName) - assert srcPath.exists() - assert dstPath.parent.is_dir() - - oldFileHandler = [h for h in logger.handlers if h.name==FileHandlerName][0] - oldFormatter = oldFileHandler.formatter - - # define new file handler - newfilehandler = logging.FileHandler(dstPath,mode=oldFileHandler.mode) - newfilehandler.setLevel(oldFileHandler.level) - newfilehandler.setFormatter(oldFormatter) - - shutil.copyfile(srcPath,dstPath) - - # add handler for destination file - logger.info('Adding new logging file handler to destination path') - - logger.addHandler(newfilehandler) - - # remove handler for source file - logger.info('Stopping write to old file handler') - logger.removeHandler(oldFileHandler) - oldFileHandler.close() - logger.info('Successfully stopped write to old file handler') - - # delete old log file - logger.info('Deleting old log file, since all content available in new log file stream') - os.rename(srcPathName,srcPathName+'removed') - - return - - -def parse_and_check_args(todayString) -> dict: - - # define the command line arguments - my_parser = argparse.ArgumentParser(description='Command-line arguments for coordinator script of env suitability model') - - # Add the arguments - # positional arguments do not start with - or -- and are always required - # optional arguments start with - or -- and default is required = False - - my_parser.add_argument( - '-p', '--component', - type = str, - choices = list(short_name.keys()), - required = True, - dest = 'component', - help = '''Name of EWS component to process, which must be present - in the config file.''') - - my_parser.add_argument( - '-c', '--config', - metavar = 'path', - type = str, - nargs = '+', # require at least one path - dest = 'config_paths', - required = True, - #default = ['config_Ethiopia_template_stripe.json'], # remove once live - #default = ['config_Bangladesh_template_stripe.json'], # remove once live - #default = ['config_Nepal_template_stripe.json'], # remove once live - help = '''path to a config file(s). More than one can be provided, - in which case each is worked on in turn (e.g. one each for stripe, stem, leaf). - Do not place other options between these.''') - - my_parser.add_argument( - '-l','--loglevel', - action = 'store', - choices = list(loglevels.keys()), - default = 'info', - help = 'verbosity of log messaging (debug, info, warning, error, critical)\n default is debug', - dest = 'log_level', # this names the attribute that will be parsed - ) - - my_parser.add_argument( - '--islive', - action = 'store_true', - help = 'If live, email messages are sent to maintainers for warning and errors', - dest = 'live', - ) - - my_parser.add_argument( - '-s','--start-date','-i','--initial-date', - metavar = 'YYYYMMDD', - action = 'store', - default = todayString, - help = 'Initial day of calculation, starting at 00 UTC (Default is today)', - dest = 'start_date', - ) - - my_parser.add_argument( - '--noupload', - action = 'store_true', - help = 'whether results of script should be saved to willow public directory' - ) - - my_parser.add_argument( - '--clearup', - action = 'store_true', - help = 'whether to delete mid-process files at the end of a successful job', - dest = 'clearup', - ) - - # get an object holding all of the args - args = my_parser.parse_args() - - # Check the args - - logger.info(f"Command-line options are:\n{args}") - - if not isinstance(args.config_paths,list): - logger.error('Expecting a list of config paths') - raise RuntimeError - - # check the startstring - if args.start_date is not todayString: - try: - # check date string is formatted correctly - provided_start_date = datetime.datetime.strptime(args.start_date,'%Y%m%d') - today_date = datetime.datetime.strptime(todayString,'%Y%m%d') - - # early limit is quite arbitrary, but this is earliest year of available survey data for Ethiopia - date_limit_early = datetime.datetime.strptime('20070101','%Y%m%d') - assert date_limit_early < provided_start_date - assert provided_start_date <= today_date - - except (ValueError, AssertionError) as e: - logger.exception("Provided start date string is formatted incorrectly or out of range, or end date not also defined") - raise - - dictionary: dict = vars(args) - return dictionary - - -def set_log_level(log_level: str): - new_log_level = loglevels[log_level] - - # modify log level of all loggers - logger.info(f"logging level being changed to {new_log_level} because of command-line option") - loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] - for logger_i in loggers: logger_i.setLevel(new_log_level) - - -def build_universal_config(configs: list,component: str, universal_config=None): - '''This config obtains aspects of each dict in configs that must be common to - them all. ''' - - # initialise universal config - if not universal_config: - universal_config = { - 'WorkspacePathout' : set(), - 'ProcessPreJob' : set(), - 'ProcessInJob' : set(), - 'ProcessEWSPlotting' : set(), - 'ServerPath' : set(), - 'ServerName' : set(), - 'ServerKey' : set()} - - keys = universal_config.keys() - - # get value of each key from each config file - for configFile in configs: - - try: - - config_i = open_and_check_config(configFile) - - except: - - logger.exception(f"Failure in opening or checking config {configFile}") - endScript(premature=True) - - for key in keys: - - try: - universal_config[key].add(config_i[key]) - - except KeyError: - - # key must be in component sub-dict - universal_config[key].add(config_i[component][key]) - - # Check for and keep only one value per key - for key in keys: - - if len(universal_config[key]) > 1: - - logger.error(f"Config files point to multiple {key} but this script can only handle one.") - endScript(premature=True) - - universal_config[key] = universal_config[key].pop() - - return universal_config - -def run_Process(args: dict): - - # check initial state of each config file, and gather terms that must apply - # across all provided configs - - if not args["live"]: - # remove the log handler that would send emails - logger.handlers = [h for h in logger.handlers if not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] - - config_paths: List[str] = args['config_paths'] - component: str = args['component'] - start_date: str = args['start_date'] - noupload: bool = args['noupload'] - clearup: bool = args['clearup'] - - universal_config = build_universal_config(config_paths, component) - - universal_config['StartString'] = start_date - - logger.info(f"Universal config is\n{json.dumps(universal_config,indent=2)}") - - workspacePath = universal_config['WorkspacePathout'] - - process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting']) - - # determine job directory - jobPath = f'{workspacePath}{short_name[component]}_{start_date}' - - logger.info(f"Job path will be {jobPath}") - - # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC - startTime = datetime.datetime.strptime(start_date+'03','%Y%m%d%H') - - # create job directory - Path(jobPath).mkdir(parents=True, exist_ok=True) - - # lock job directory - with jobStatus(jobPath) as status: - - #lawrence comment in/out - # check for a status file in job directory - if status.had_initial_status: - logger.info(f"Job path already exists and has status {status.status}") - - endScript(premature = status.status not in ['SUCCESS','INPROGRESS']) - - logger.info(f"Current status of job directory is {status.status}") - - # now that we have a useable job directory, move the log file there - logPathJob = f"{jobPath}/log.txt" - - move_default_logfile_handler(dstPathName=logPathJob) - - # files and directories that will be uploaded to public server - FilesToSend = [] - - # files and directories that will be earmarked for removal after a - # successful job - paths_to_clear = [] - - logger.info('Starting to work on each configuration') - - for configIndex, configtemplate in enumerate(config_paths): - - config_paths_length: int = len(config_paths) - logger.info(f'Working on config {configIndex+1} of {config_paths_length}') - - try: - configjson = open_and_check_config(configtemplate) - except: - logger.exception(f"Failure in opening or checking config {configtemplate}") - # TODO: This case should test flagdir.jobStatus.__exit__() - raise # endJob('ERROR',premature=True) - - # provide specific case details to template config - - configjson['StartTime'] = startTime.strftime('%Y-%m-%d-%H%M') - configjson['StartString'] = start_date - - # from configtemplate create configFileName to describe the specific job - component: str = component - configFileName = f"{os.path.basename(configtemplate).replace('.json','')}_{component}" - - configjson['ConfigFilePath'] = configFileName - - # write the complete configuration file to job directory - with open(f"{jobPath}/{configFileName}.json",'w') as write_file: - json.dump(configjson,write_file,indent=4) - - # Run EWS-plotting command - - proc_description = universal_config['ProcessEWSPlotting'] - try: - EWSPlottingOutputs = process_EWS_plotting(jobPath,configjson) - except: - logger.exception(f"Error in {proc_description}()") - status.reset('ERROR') - endJob(status,premature=True) - - logger.info('Finished with EWS-Plotting, appending images to list for transfer') - - if EWSPlottingOutputs: - - append_item_to_list( - EWSPlottingOutputs, - FilesToSend, - proc_description, - status) - - logger.info(f'Finished with config {configIndex+1} of {config_paths_length}') - - # send results to remote server - - if not noupload: - try: - ProcessorComponents.upload(universal_config, FilesToSend, component) - except IndexError: - status.reset('WARNING') - - except: - logger.exception('Failed to upload files to remote server') - status.reset('ERROR') - endJob(status,premature=True) - - # check if there is a second location on willow to provide results - if 'ServerPathExtra' in configjson[component]: - - logger.info('There is an extra path to send results to:') - - extra_path = configjson[component]['ServerPathExtra'] - - logger.info(extra_path) - - universal_config_extra = universal_config.copy() - universal_config_extra['ServerPath'] = extra_path - - try: - ProcessorComponents.upload(universal_config_extra, FilesToSend, component) - except IndexError: - status.reset('WARNING') - - except: - logger.exception('Failed to upload files to extra directory on remote server') - status.reset('ERROR') - endJob(status,premature=True) - - else: - logger.info('Because noupload argument was present, not sending results to remote server') - - status.reset('SUCCESS') - - if status.is_success() & (clearup is True): - - logger.info('Clearing up') - - clearup_dest_dir = f"{workspacePath}/clearup/{short_name[component]}_{start_date}/" - Path(clearup_dest_dir).mkdir(parents=True, exist_ok=True) - - logger.info(f"While developing, moving directories to this directory : {clearup_dest_dir}") - - clear_up( paths_to_clear, clearup_dest = clearup_dest_dir) - - endScript(premature=False) - - -if __name__ == '__main__': - try: - logger.info("==========") - logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") - # load configurations - args_dict: dict = parse_and_check_args(todayString) - set_log_level(args_dict['log_level']) - run_Process(args_dict) - except SystemExit: - - logger.info('run_process() exited') - pass - except: - logger.exception('Uncaught exception in run_Process:') - diff --git a/coordinator/ProcessorSurveysWRT.py b/coordinator/ProcessorSurveysWRT.py index be12633ad4bfe20b7021e7918239e3279e4971c4..d2a50c2ddeead781ea13cf8295cebb5a65b713bd 100644 --- a/coordinator/ProcessorSurveysWRT.py +++ b/coordinator/ProcessorSurveysWRT.py @@ -179,6 +179,7 @@ def nested_to_flattened(df): row[nested_row[rr]['DiseaseName'] + '.Incident'] = nested_row[rr]['IncidenceCategory'] row[nested_row[rr]['DiseaseName'] + '.Severity'] = nested_row[rr]['SeverityCategory'] + nested_row[rr]['listResult'] = [{'Race': 'Alma', 'Genotype': 'Korte'},{'Race': 'Banan', 'Genotype': 'Malna'}] # !!!!!!!!! DELETE THIS LINE !!!!!!!!!! for i in range(len(nested_row[rr]['listResult'])): # TODO: check if the separation symbol is in the string or not row[nested_row[rr]['DiseaseName'] + '.Race'] += nested_row[rr]['listResult'][i]['Race']