#ProcessorUtils.py '''Functions called by Processor.py and ProcessorComponents.py . ''' import glob import json import logging import os import re from string import Template import subprocess import sys import tarfile short_name = { 'Advisory' : 'SUMMARY', 'Deposition' : 'DEPOSITION', 'Environment' : 'ENVIRONMENT_2.0', 'Epidemiology' : 'EPI', 'Survey' : 'SURVEYDATA', } disease_latin_name_dict = { 'StemRust' : 'P_GRAMINIS', 'StripeRust' : 'P_STRIIFORMIS', 'LeafRust' : 'P_RECONDITA', 'WheatBlast' : 'M_ORYZAE'} # define logging filter to obscure ODK passwords def filter_exception_message(exc_info): '''To mask out passwords from subprocess input list.''' error = exc_info[1] # subprocess input appears as a list if len(error.args) > 1: if isinstance(error.args[1],list): msg_list = error.args[1] pwd_arg = '--odk_password' if pwd_arg in msg_list: # get the index of the password, it comes after the input flag passwd_flag_idx = [i for i,val in enumerate(msg_list) if val==pwd_arg][0] passwd_idx = passwd_flag_idx + 1 # replace the password msg_list[passwd_idx] = '***' return class PasswordODKFilter(logging.Filter): '''Built with https://docs.python.org/3.6/howto/logging-cookbook.html#configuring-filters-with-dictconfig ''' def __init__(self, param=None): # example of how to pass parameters self.param = param def filter(self, record): # on the primary message input = record.msg masked_msg = re.sub("password', '\S+'","password', '***'",input) record.msg = masked_msg # on any exception info if record.exc_info is not None: filter_exception_message(record.exc_info) # return boolean to determine whether record gets logged return True def add_filters_to_sublogger(logger_case): for h in logger_case.handlers: h.addFilter(PasswordODKFilter()) logger = logging.getLogger('Processor.Utils') add_filters_to_sublogger(logger) def open_and_check_config(configFile): logger.info("Loading config") try: with open(configFile) as config_file: config = json.load(config_file) except: logger.exception(f"Failure opening config file {configFile}") raise logger.debug(json.dumps(config,indent=2)) #TODO: Checking config file for required formatting try: pass except AssertionError as e: logger.exception("Configuration file is not formatted as required") raise return config def get_only_existing_globs(file_globs,inplace=True): if inplace: for i,fg in enumerate(file_globs): if not glob.glob(fg): logger.warning(f'This file pattern returns nothing and therefore does not contribute to result:\n{fg}') file_globs.pop(i) else: globs_out = [] for i,fg in enumerate(file_globs): if not glob.glob(fg): logger.warning(f'This file pattern returns nothing and therefore does not contribute to result:\n{fg}') continue globs_out += [fg] return globs_out def subprocess_and_log(cmd,description_short,description_long,check=True,log_type='error',**kwargs): '''Run a shell command (described by a comma separated list) and send stdout and stderr to logfile, and raise any exception. log_type is a string to determine which log message level to use, must be one of the logging options: info, debug, warn, warning, error, exception or fatal.''' # determine message type to write to log log_call = getattr(logger,log_type) try: process = subprocess.run( cmd, check=check, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, **kwargs) for line in process.stdout.decode('utf-8').split(r'\n'): logger.info(f"{description_short} : " + line) except subprocess.CalledProcessError as e: for line in e.stdout.decode('utf-8').split(r'\n'): logger.info(f"{description_short} : " + line) log_call(f"Some failure when running {description_long}", exc_info=True) raise return process def endScript(premature=True): if not premature: logger.info(f'Script finished!') else: logger.info(f'Script finished prematurely') logger.info(f'--------') sys.exit() def endJob(status,ignore_inprogress=False,**kwargs): # get status file fresh, and ensure one exists status.status = status.get() assert status.has_status if status.is_inprogress() & (ignore_inprogress == False): logger.warning(f'Workflow problem: status is {status.status} but endJob called, forcing error status') status.reset('ERROR') endScript(**kwargs) def remove_path_from_tar_members(tf: tarfile.TarFile): """ strips the parent path from files within a tar file before untaring. means untarred files will not have subdirs. Only used with the tarfile package """ for member in tf.getmembers(): member.path = os.path.basename(member.path) yield member def query_proceed(necessary_file,description): try: assert os.path.isfile(necessary_file) logger.info(f"Found:\n{necessary_file}\nso {description} job has succeeded for this date, this job shall run.") except AssertionError as e: logger.info(f"Failed to find:\n{necessary_file}\nso {description} job has not yet succeeded for this date, so cannot run this job.") endScript(premature=True) return False return True def query_past_successes(input_args): '''Checks if deposition and environment jobs are already completed successfully. If not, it raises an error.''' component = input_args.component # check configs can be loaded config_fns = input_args.config_paths for configFile in config_fns: try: config_i = open_and_check_config(configFile) except: logger.exception(f"Failure in opening or checking config {configFile}") endScript(premature=True) # some config initialisation is necessary config_i['StartString'] = input_args.start_date # check if deposition data is readily available dep_success_file = Template(config_i[component]['Deposition']['SuccessFileTemplate']).substitute(**config_i) try: query_proceed(dep_success_file,'deposition') except: if 'AlternativeSuccessFileTemplate' not in config_i[component]['Deposition']: logger.warning(f"No AlternativeSuccessFileTemplate to check for") endScript(premature=True) dep_success_file_alt = Template(config_i[component]['Deposition']['AlternativeSuccessFileTemplate']).substitute(**config_i) query_proceed(dep_success_file_alt,'deposition') # check if environment data is readily available env_success_file = Template(config_i[component]['Environment']['SuccessFileTemplate']).substitute(**config_i) try: query_proceed(env_success_file,'environment') except: if 'AlternativeSuccessFileTemplate' not in config_i[component]['Environment']: logger.warning(f"No AlternativeSuccessFileTemplate to check for") endScript(premature=True) env_success_file_alt = Template(config_i[component]['Environment']['AlternativeSuccessFileTemplate']).substitute(**config_i) query_proceed(env_success_file_alt,'environment') return True