FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit 637d851f authored by Dr T. Mona's avatar Dr T. Mona
Browse files

Revert "fix: remove debug line"

This reverts commit 1f935d5f.
parent 1f935d5f
No related branches found
No related tags found
No related merge requests found
#Processor.py
'''To be used for handling any operational component of wheat rust early warning
system.
Command-line options specify which component and which json configuration
files to run, and complete any remaining elements of the configuration files.
Component varies by choices of process_pre_job(), process_in_job() and
process_EWS_plotting().
This runs all dates of a single forecast.
Example usage::
$ conda activate py3EWSepi
$ python Processor.py -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715
or::
$ ./run_Processor.sh -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715
'''
from typing import List
print("Make sure to `conda activate py3EWSepi` environment!")
print("Make sure that flagdir package is available (on PYTHONPATH)")
import argparse
import datetime
import json
import logging
import logging.config
import os
from pathlib import Path
import shutil
import sys
# gitlab projects
from flagdir import jobStatus # created by jws52
# submodules of this project
import BufferingSMTPHandler
import ProcessorComponents
from ProcessorUtils import (
append_item_to_list,
clear_up,
endScript,
endJob,
open_and_check_config,
PasswordODKFilter,
short_name
)
# initialise default values for configuration
script_name = 'Processor'
timeNow = datetime.datetime.today()
dateToday = timeNow.date()
todayString = timeNow.strftime('%Y%m%d')
nowString = timeNow.strftime('%Y%m%d-%H%M-%S')
# get the path to this script
script_path = os.path.dirname(__file__)+'/'
coordinator_path = script_path
# log file for all jobs
log_path_project = f"{coordinator_path}logs/log.txt"
# job-specific log file will be written here until a job directory exits, when it will be moved there
log_path_default = f"{coordinator_path}logs/log_{nowString}.txt"
# get the email credentials file path from the environment variables
assert 'EMAIL_CRED' in os.environ
email_credential_fn = os.environ['EMAIL_CRED']
assert os.path.exists(email_credential_fn)
with open(email_credential_fn,'r') as f:
gmail_config = json.load(f)
# check contents
required_keys = ['user','pass','host','port','toaddrs']
for required_key in required_keys:
assert required_key in gmail_config
# load config from python dictionary (could be loaded from json file)
# TODO: smtp handler can only use tls, but ssl is more secure. Look into defining/writing a suitable smtp handler
logConfigDict = {
'version' : 1,
'disable_existing_loggers': False,
'formatters' : {
'simple' : {
'format' : '%(name)s : %(levelname)s - %(message)s'
},
'detailed' : {
'format' : f"""
For command:
{' '.join(sys.argv)}
%(levelname)s in %(name)s encountered at %(asctime)s:
%(message)s
Resolve this error and restart processing.
""",
'datefmt' : '%Y-%m-%d %H:%M:%S'
}
},
'filters' : {
'mask_passwords' : {
'()' : PasswordODKFilter
}
},
'handlers' : {
# logging for project
'handler_rot_file' : {
'class' : 'logging.handlers.TimedRotatingFileHandler',
'level' : 'INFO',
'formatter' : 'simple',
'filters' : ['mask_passwords'],
'filename' : log_path_project,
# time of day on given day
'when' : 'W2', # rotate file every Wednesday
'atTime' : datetime.time(1,0,0), # at 1am local time
'backupCount' : 12,
},
# logging for job
'handler_file' : {
'class' : 'logging.FileHandler',
'level' : 'INFO',
'formatter' : 'simple',
'filters' : ['mask_passwords'],
'filename' : log_path_default,
'mode' : 'a', # 'a' for append
},
# to email errors to maintainers
'handler_buffered_email': {
'class': 'BufferingSMTPHandler.BufferingSMTPHandler',
'level': 'ERROR',
'server': (gmail_config['host'], gmail_config['port']), # host, port. 465 fsor SSL, 587 for tls
'credentials': (gmail_config['user'], gmail_config['pass']),
'fromaddr': gmail_config['user'],
'toaddrs': gmail_config['toaddrs'],
'subject': 'ERROR in EWS Processor',
'formatter': 'detailed',
'filters': ['mask_passwords'],
'capacity': 100
},
},
'loggers' : {
# this is activated when this script is imported
# i.e. with logging.getLogger('Process.')
script_name : {
'level' : 'INFO',
'handlers' : ['handler_rot_file','handler_file','handler_buffered_email'],
'propagate' : True,
},
# this is activated when this script is called on the command line
# or from a bash script
# i.e. with logging.getLogger(__name__) when name == '__main__'
'__main__' : {
'level' : 'INFO',
'handlers' : ['handler_rot_file','handler_file','handler_buffered_email'],
'propagate' : True,
}
}
}
logging.config.dictConfig(logConfigDict)
print(__name__)
# create a logger named according to how the file is called
#logger = logging.getLogger(__name__)
logger = logging.getLogger(script_name)
loglevels = {'debug':logging.DEBUG,
'info':logging.INFO,
'warning':logging.WARNING,
'error':logging.ERROR,
'critical':logging.CRITICAL,
}
def move_default_logfile_handler(dstPathName,srcPathName=log_path_default,FileHandlerName='handler_file',):
'''For on-the-fly move of logging from default file to another. Copies the
contents of the source log file to destination, switches file handler in
logger, then removes source log file.'''
logger.info(f"Moving logfile location from:\n{srcPathName}\nto:\n{dstPathName}")
# copy old log file to new filename
srcPath = Path(srcPathName)
dstPath = Path(dstPathName)
assert srcPath.exists()
assert dstPath.parent.is_dir()
oldFileHandler = [h for h in logger.handlers if h.name==FileHandlerName][0]
oldFormatter = oldFileHandler.formatter
# define new file handler
newfilehandler = logging.FileHandler(dstPath,mode=oldFileHandler.mode)
newfilehandler.setLevel(oldFileHandler.level)
newfilehandler.setFormatter(oldFormatter)
shutil.copyfile(srcPath,dstPath)
# add handler for destination file
logger.info('Adding new logging file handler to destination path')
logger.addHandler(newfilehandler)
# remove handler for source file
logger.info('Stopping write to old file handler')
logger.removeHandler(oldFileHandler)
oldFileHandler.close()
logger.info('Successfully stopped write to old file handler')
# delete old log file
logger.info('Deleting old log file, since all content available in new log file stream')
os.rename(srcPathName,srcPathName+'removed')
return
def parse_and_check_args(todayString) -> dict:
# define the command line arguments
my_parser = argparse.ArgumentParser(description='Command-line arguments for coordinator script of env suitability model')
# Add the arguments
# positional arguments do not start with - or -- and are always required
# optional arguments start with - or -- and default is required = False
my_parser.add_argument(
'-p', '--component',
type = str,
choices = list(short_name.keys()),
required = True,
dest = 'component',
help = '''Name of EWS component to process, which must be present
in the config file.''')
my_parser.add_argument(
'-c', '--config',
metavar = 'path',
type = str,
nargs = '+', # require at least one path
dest = 'config_paths',
required = True,
#default = ['config_Ethiopia_template_stripe.json'], # remove once live
#default = ['config_Bangladesh_template_stripe.json'], # remove once live
#default = ['config_Nepal_template_stripe.json'], # remove once live
help = '''path to a config file(s). More than one can be provided,
in which case each is worked on in turn (e.g. one each for stripe, stem, leaf).
Do not place other options between these.''')
my_parser.add_argument(
'-l','--loglevel',
action = 'store',
choices = list(loglevels.keys()),
default = 'info',
help = 'verbosity of log messaging (debug, info, warning, error, critical)\n default is debug',
dest = 'log_level', # this names the attribute that will be parsed
)
my_parser.add_argument(
'--islive',
action = 'store_true',
help = 'If live, email messages are sent to maintainers for warning and errors',
dest = 'live',
)
my_parser.add_argument(
'-s','--start-date','-i','--initial-date',
metavar = 'YYYYMMDD',
action = 'store',
default = todayString,
help = 'Initial day of calculation, starting at 00 UTC (Default is today)',
dest = 'start_date',
)
my_parser.add_argument(
'--noupload',
action = 'store_true',
help = 'whether results of script should be saved to willow public directory'
)
my_parser.add_argument(
'--clearup',
action = 'store_true',
help = 'whether to delete mid-process files at the end of a successful job',
dest = 'clearup',
)
# get an object holding all of the args
args = my_parser.parse_args()
# Check the args
logger.info(f"Command-line options are:\n{args}")
if not isinstance(args.config_paths,list):
logger.error('Expecting a list of config paths')
raise RuntimeError
# check the startstring
if args.start_date is not todayString:
try:
# check date string is formatted correctly
provided_start_date = datetime.datetime.strptime(args.start_date,'%Y%m%d')
today_date = datetime.datetime.strptime(todayString,'%Y%m%d')
# early limit is quite arbitrary, but this is earliest year of available survey data for Ethiopia
date_limit_early = datetime.datetime.strptime('20070101','%Y%m%d')
assert date_limit_early < provided_start_date
assert provided_start_date <= today_date
except (ValueError, AssertionError) as e:
logger.exception("Provided start date string is formatted incorrectly or out of range, or end date not also defined")
raise
dictionary: dict = vars(args)
return dictionary
def set_log_level(log_level: str):
new_log_level = loglevels[log_level]
# modify log level of all loggers
logger.info(f"logging level being changed to {new_log_level} because of command-line option")
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
for logger_i in loggers: logger_i.setLevel(new_log_level)
def build_universal_config(configs: list,component: str, universal_config=None):
'''This config obtains aspects of each dict in configs that must be common to
them all. '''
# initialise universal config
if not universal_config:
universal_config = {
'WorkspacePathout' : set(),
'ProcessPreJob' : set(),
'ProcessInJob' : set(),
'ProcessEWSPlotting' : set(),
'ServerPath' : set(),
'ServerName' : set(),
'ServerKey' : set()}
keys = universal_config.keys()
# get value of each key from each config file
for configFile in configs:
try:
config_i = open_and_check_config(configFile)
except:
logger.exception(f"Failure in opening or checking config {configFile}")
endScript(premature=True)
for key in keys:
try:
universal_config[key].add(config_i[key])
except KeyError:
# key must be in component sub-dict
universal_config[key].add(config_i[component][key])
# Check for and keep only one value per key
for key in keys:
if len(universal_config[key]) > 1:
logger.error(f"Config files point to multiple {key} but this script can only handle one.")
endScript(premature=True)
universal_config[key] = universal_config[key].pop()
return universal_config
def run_Process(args: dict):
# check initial state of each config file, and gather terms that must apply
# across all provided configs
if not args["live"]:
# remove the log handler that would send emails
logger.handlers = [h for h in logger.handlers if not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)]
config_paths: List[str] = args['config_paths']
component: str = args['component']
start_date: str = args['start_date']
noupload: bool = args['noupload']
clearup: bool = args['clearup']
universal_config = build_universal_config(config_paths, component)
universal_config['StartString'] = start_date
logger.info(f"Universal config is\n{json.dumps(universal_config,indent=2)}")
workspacePath = universal_config['WorkspacePathout']
process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting'])
# determine job directory
jobPath = f'{workspacePath}{short_name[component]}_{start_date}'
logger.info(f"Job path will be {jobPath}")
# note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC
startTime = datetime.datetime.strptime(start_date+'03','%Y%m%d%H')
# create job directory
Path(jobPath).mkdir(parents=True, exist_ok=True)
# lock job directory
with jobStatus(jobPath) as status:
#lawrence comment in/out
# check for a status file in job directory
if status.had_initial_status:
logger.info(f"Job path already exists and has status {status.status}")
endScript(premature = status.status not in ['SUCCESS','INPROGRESS'])
logger.info(f"Current status of job directory is {status.status}")
# now that we have a useable job directory, move the log file there
logPathJob = f"{jobPath}/log.txt"
move_default_logfile_handler(dstPathName=logPathJob)
# files and directories that will be uploaded to public server
FilesToSend = []
# files and directories that will be earmarked for removal after a
# successful job
paths_to_clear = []
logger.info('Starting to work on each configuration')
for configIndex, configtemplate in enumerate(config_paths):
config_paths_length: int = len(config_paths)
logger.info(f'Working on config {configIndex+1} of {config_paths_length}')
try:
configjson = open_and_check_config(configtemplate)
except:
logger.exception(f"Failure in opening or checking config {configtemplate}")
# TODO: This case should test flagdir.jobStatus.__exit__()
raise # endJob('ERROR',premature=True)
# provide specific case details to template config
configjson['StartTime'] = startTime.strftime('%Y-%m-%d-%H%M')
configjson['StartString'] = start_date
# from configtemplate create configFileName to describe the specific job
component: str = component
configFileName = f"{os.path.basename(configtemplate).replace('.json','')}_{component}"
configjson['ConfigFilePath'] = configFileName
# write the complete configuration file to job directory
with open(f"{jobPath}/{configFileName}.json",'w') as write_file:
json.dump(configjson,write_file,indent=4)
# Run EWS-plotting command
proc_description = universal_config['ProcessEWSPlotting']
try:
EWSPlottingOutputs = process_EWS_plotting(jobPath,configjson)
except:
logger.exception(f"Error in {proc_description}()")
status.reset('ERROR')
endJob(status,premature=True)
logger.info('Finished with EWS-Plotting, appending images to list for transfer')
if EWSPlottingOutputs:
append_item_to_list(
EWSPlottingOutputs,
FilesToSend,
proc_description,
status)
logger.info(f'Finished with config {configIndex+1} of {config_paths_length}')
# send results to remote server
if not noupload:
try:
ProcessorComponents.upload(universal_config, FilesToSend, component)
except IndexError:
status.reset('WARNING')
except:
logger.exception('Failed to upload files to remote server')
status.reset('ERROR')
endJob(status,premature=True)
# check if there is a second location on willow to provide results
if 'ServerPathExtra' in configjson[component]:
logger.info('There is an extra path to send results to:')
extra_path = configjson[component]['ServerPathExtra']
logger.info(extra_path)
universal_config_extra = universal_config.copy()
universal_config_extra['ServerPath'] = extra_path
try:
ProcessorComponents.upload(universal_config_extra, FilesToSend, component)
except IndexError:
status.reset('WARNING')
except:
logger.exception('Failed to upload files to extra directory on remote server')
status.reset('ERROR')
endJob(status,premature=True)
else:
logger.info('Because noupload argument was present, not sending results to remote server')
status.reset('SUCCESS')
if status.is_success() & (clearup is True):
logger.info('Clearing up')
clearup_dest_dir = f"{workspacePath}/clearup/{short_name[component]}_{start_date}/"
Path(clearup_dest_dir).mkdir(parents=True, exist_ok=True)
logger.info(f"While developing, moving directories to this directory : {clearup_dest_dir}")
clear_up( paths_to_clear, clearup_dest = clearup_dest_dir)
endScript(premature=False)
if __name__ == '__main__':
try:
logger.info("==========")
logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}")
# load configurations
args_dict: dict = parse_and_check_args(todayString)
set_log_level(args_dict['log_level'])
run_Process(args_dict)
except SystemExit:
logger.info('run_process() exited')
pass
except:
logger.exception('Uncaught exception in run_Process:')
......@@ -179,6 +179,7 @@ def nested_to_flattened(df):
row[nested_row[rr]['DiseaseName'] + '.Incident'] = nested_row[rr]['IncidenceCategory']
row[nested_row[rr]['DiseaseName'] + '.Severity'] = nested_row[rr]['SeverityCategory']
nested_row[rr]['listResult'] = [{'Race': 'Alma', 'Genotype': 'Korte'},{'Race': 'Banan', 'Genotype': 'Malna'}] # !!!!!!!!! DELETE THIS LINE !!!!!!!!!!
for i in range(len(nested_row[rr]['listResult'])):
# TODO: check if the separation symbol is in the string or not
row[nested_row[rr]['DiseaseName'] + '.Race'] += nested_row[rr]['listResult'][i]['Race']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment