diff --git a/coordinator/Processor.py b/coordinator/Processor.py index 3ce7217e459139bf4f74840e9c274792301633a4..ffcf3f28d4714b82181ba878359f3ecc1430a30d 100755 --- a/coordinator/Processor.py +++ b/coordinator/Processor.py @@ -17,7 +17,7 @@ or:: $ ./run_Processor.sh -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715 ''' from abc import abstractmethod, ABCMeta -from typing import List, Union, Any +from typing import List, Union, Any, Dict print("Make sure to `conda activate py3EWSepi` environment!") @@ -62,29 +62,33 @@ class Processor: def __init__(self) -> None: super().__init__() - self.setup() + time_now = datetime.datetime.today() + # dateToday = timeNow.date() + self.todayString = time_now.strftime('%Y%m%d') + self.nowString = time_now.strftime('%Y%m%d-%H%M-%S') + # self.setup() - def setup(self): + def setup_logging(self, job_log_file_path: str, is_live: bool): # initialise default values for configuration script_name = 'Processor' - timeNow = datetime.datetime.today() - dateToday = timeNow.date() - self.todayString = timeNow.strftime('%Y%m%d') - self.nowString = timeNow.strftime('%Y%m%d-%H%M-%S') - # get the path to this script script_path = os.path.dirname(__file__) + '/' coordinator_path = script_path # log file for all jobs + ##todo how does this work when there are several processors running at once, i.o. errors? log_path_project = f"{coordinator_path}logs/log.txt" + # make the job_log_file_path parent dirs if it does not exist + job_log_file_parent_dirs = os.path.dirname(job_log_file_path) + Path(job_log_file_parent_dirs).mkdir(parents=True, exist_ok=True) + # job-specific log file will be written here until a job directory exits, when it will be moved there - self.log_path_default = f"{coordinator_path}logs/log_{self.nowString}.txt" + # self.log_path_default = f"{coordinator_path}logs/log_{self.nowString}.txt" # get the email credentials file path from the environment variables assert 'EMAIL_CRED' in os.environ @@ -149,7 +153,7 @@ class Processor: 'level': 'INFO', 'formatter': 'simple', 'filters': ['mask_passwords'], - 'filename': self.log_path_default, + 'filename': job_log_file_path, 'mode': 'a', # 'a' for append }, # to email errors to maintainers @@ -192,6 +196,10 @@ class Processor: # create a logger named according to how the file is called # logger = logging.getLogger(__name__) self.logger = logging.getLogger(script_name) + if not is_live: + # remove the log handler that would send emails + self.logger.handlers = [h for h in self.logger.handlers if + not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] self.loglevels = {'debug': logging.DEBUG, 'info': logging.INFO, @@ -281,7 +289,7 @@ class Processor: my_parser.add_argument( '-l', '--loglevel', action = 'store', - choices = list(self.loglevels.keys()), + choices = list(["debug", "info", "warning", "error", "critical"]), default = 'info', help = 'verbosity of log messaging (debug, info, warning, error, critical)\n default is debug', dest = 'log_level', # this names the attribute that will be parsed @@ -321,10 +329,10 @@ class Processor: # Check the args - self.logger.info(f"Command-line options are:\n{args}") + print(f"Command-line options are:\n{args}") if not isinstance(args.config_paths, list): - self.logger.error('Expecting a list of config paths') + print('Expecting a list of config paths') raise RuntimeError # check the startstring @@ -340,24 +348,23 @@ class Processor: assert provided_start_date <= today_date except (ValueError, AssertionError) as e: - self.logger.exception( + raise Exception( "Provided start date string is formatted incorrectly or out of range, or end date not also defined") - raise dictionary: dict = vars(args) return dictionary def set_log_level(self, log_level: str): - new_log_level = self.loglevels[log_level] + new_log_level_code = self.loglevels[log_level] # modify log level of all self.loggers - self.logger.info(f"logging level being changed to {new_log_level} because of command-line option") + self.logger.info(f"logging level being changed to {new_log_level_code} ({log_level}) because of command-line option") loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] - for logger_i in loggers: logger_i.setLevel(new_log_level) + for logger_i in loggers: logger_i.setLevel(new_log_level_code) - def build_universal_config(self, configs: list, component: str, universal_config = None): + def build_universal_config(self, configs: list, component: str, universal_config = None) -> dict: '''This config obtains aspects of each dict in configs that must be common to them all. ''' @@ -407,43 +414,65 @@ class Processor: return universal_config + def prepare_job_directory(self, job_path): + """ + create job directory or archive if already exists (due to a rerun) + + :param job_path: + :return: + """ + if os.path.exists(job_path): + archived_dir_path = f"{job_path}_{self.nowString}" + message = f"Job path {job_path} already exists, so moving it to {archived_dir_path}" + print(message) + os.rename(job_path, archived_dir_path) + + Path(job_path).mkdir(parents = True, exist_ok = False) + - def run_process(self, args: dict): + def run_process(self, + args: dict) -> None: # check initial state of each config file, and gather terms that must apply # across all provided configs - if not args["live"]: - # remove the log handler that would send emails - self.logger.handlers = [h for h in self.logger.handlers if - not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] - config_paths: List[str] = args['config_paths'] component: str = args['component'] start_date: str = args['start_date'] noupload: bool = args['noupload'] clearup: bool = args['clearup'] + # load universal configuration universal_config = self.build_universal_config(config_paths, component) - universal_config['StartString'] = start_date - self.logger.info(f"Universal config is\n{json.dumps(universal_config, indent = 2)}") + # determine job directory + workspacePath = universal_config['WorkspacePathout'] + job_path: str = f'{workspacePath}{short_name[component]}_{start_date}' + self.prepare_job_directory(job_path) + + log_file_path = f"{job_path}/log.txt" + is_live: bool = args["live"] + self.setup_logging(log_file_path, is_live) + self.set_log_level(args['log_level']) + self.set_component_logger() + + self.logger.info("==========") + self.logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") + self.logger.info(f"Universal config is\n{json.dumps(universal_config, indent = 2)}") + self.logger.info(f"Job path will be {job_path}") workspacePath = universal_config['WorkspacePathout'] # process_pre_job = getattr(ProcessorComponents, universal_config['ProcessPreJob']) # # process_in_job = getattr(ProcessorComponents, universal_config['ProcessInJob']) # - # process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting']) - - # determine job directory - jobPath: str = f'{workspacePath}{short_name[component]}_{start_date}' - self.logger.info(f"Job path will be {jobPath}") + # process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting']) # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC - startTime = datetime.datetime.strptime(start_date + '03', '%Y%m%d%H') + start_time: datetime = datetime.datetime.strptime(start_date + '03', '%Y%m%d%H') + start_time_string: str = start_time.strftime('%Y-%m-%d-%H%M') # run any checks before creating a job directory # if this fails, then make a note once there is a job directory @@ -452,12 +481,9 @@ class Processor: # if not ready: # self.logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") - # create job directory - Path(jobPath).mkdir(parents = True, exist_ok = True) - # lock job directory status: jobStatus - with jobStatus(jobPath) as status: + with jobStatus(job_path) as status: # lawrence comment in/out # check for a status file in job directory @@ -468,10 +494,7 @@ class Processor: self.logger.info(f"Current status of job directory is {status.status}") - # now that we have a useable job directory, move the log file there - logPathJob = f"{jobPath}/log.txt" - - self.move_default_logfile_handler(dstPathName = logPathJob) + # self.move_default_logfile_handler(dstPathName = logPathJob) # make a record if process_pre_job failed if not ready: @@ -504,7 +527,7 @@ class Processor: # provide specific case details to template config - configjson['StartTime'] = startTime.strftime('%Y-%m-%d-%H%M') + configjson['StartTime'] = start_time_string configjson['StartString'] = start_date # from configtemplate create configFileName to describe the specific job @@ -514,13 +537,13 @@ class Processor: configjson['ConfigFilePath'] = configFileName # write the complete configuration file to job directory - with open(f"{jobPath}/{configFileName}.json", 'w') as write_file: + with open(f"{job_path}/{configFileName}.json", 'w') as write_file: json.dump(configjson, write_file, indent = 4) # proc_description = universal_config['ProcessInJob'] proc_description = 'ProcessInJob' try: - proc_out = self.process_in_job(jobPath, status, configjson, component) + proc_out = self.process_in_job(job_path, status, configjson, component) except: self.logger.exception(f"Error in process_in_job") status.reset('ERROR') @@ -552,7 +575,7 @@ class Processor: # proc_description = universal_config['ProcessEWSPlotting'] proc_description = 'ProcessEWSPlotting' try: - EWSPlottingOutputs = self.process_post_job(jobPath, configjson) + EWSPlottingOutputs = self.process_post_job(job_path, configjson) except: self.logger.exception(f"Error in {proc_description}()") status.reset('ERROR') @@ -633,22 +656,27 @@ class Processor: def process_post_job(self, jobPath, configjson): raise NotImplementedError + @abstractmethod + def set_component_logger(self): + """ + overridden in sub classes to set component-specific loggers + :return: + """ + raise NotImplementedError def run_processor(self, component: str): print("Make sure to `conda activate py3EWSepi` environment!") print("Make sure that flagdir package is available (on PYTHONPATH)") try: - self.logger.info("==========") - self.logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") - # load configurations - args_dict: dict = self.parse_and_check_args() - args_dict["component"] = component - self.set_log_level(args_dict['log_level']) - self.run_process(args_dict) + args: dict = self.parse_and_check_args() + args["component"] = component + + self.run_process(args) + except SystemExit as e: print("caught with code " + str(e.code)) - self.logger.info('run_process() exited') + if self.logger: + self.logger.info('run_process() exited') sys.exit(e.code) except: - self.logger.exception('Uncaught exception in run_Process:') - sys.exit(1) + raise Exception('Uncaught exception in run_Process:') diff --git a/coordinator/ProcessorAdvisory.py b/coordinator/ProcessorAdvisory.py index 582bd9e8f98338eaa9d24f6091ed5a7feaf36470..75f033df07c25c1d35476741ea86de2b7f7aa744 100644 --- a/coordinator/ProcessorAdvisory.py +++ b/coordinator/ProcessorAdvisory.py @@ -19,8 +19,14 @@ from ProcessorUtils import ( class ProcessorAdvisory(Processor): + def set_component_logger(self): + logger = logging.getLogger('Processor.Advisory') + add_filters_to_sublogger(logger) + + def process_pre_job(self, args) -> bool: - return self.process_pre_job_advisory(args) + # return self.process_pre_job_advisory(args) + return True def process_in_job(self, jobPath, status, configjson, component) -> object: @@ -33,40 +39,7 @@ class ProcessorAdvisory(Processor): def __init__(self) -> None: super().__init__() - logger = logging.getLogger('Processor.Advisory') - add_filters_to_sublogger(logger) - - def process_pre_job_advisory(self,input_args: dict) -> bool: - # self.logger.info('started process_pre_job_advisory()') - # - # # check configs can be loaded - # config_fns: List[str] = input_args['config_paths'] - # for configFile in config_fns: - # try: - # config_i = open_and_check_config(configFile) - # except: - # self.logger.exception(f"Failure in opening or checking config {configFile}") - # endScript(premature=True) - - # check pre-requisite jobs are complete - - # Which jobs to check are defined by the config. It may include epi - # and surveys. - - # Note that yesterday's surveys should also have succeeded, but that - # needs a YesterdayDateString available for the survey part of - # advisory config to point to the status file. - - # dependent_components = config_i['Advisory'].get( - # 'DependentComponents', - # ['Deposition','Environment']) - - # for dependent_component in dependent_components: - # - # query_past_successes(input_args, dependent_component) - - return True def process_in_job_advisory(self, jobPath, status, config, component): '''Generates a word processor file containing some basic survey statistics @@ -106,4 +79,4 @@ class ProcessorAdvisory(Processor): if __name__ == '__main__': processor = ProcessorAdvisory() - processor.run_processor("Advisory") \ No newline at end of file + processor.run_processor("Advisory") diff --git a/coordinator/ProcessorComponents.py b/coordinator/ProcessorComponents.py index 53a132ead95dff9ed484951dc8d5421aeec03108..f2e839138d6dcebb3b53c99d27807d44a5787581 100644 --- a/coordinator/ProcessorComponents.py +++ b/coordinator/ProcessorComponents.py @@ -12,14 +12,14 @@ from typing import List # coordinator stages: pre, in (during) and plotting. -from ProcessorServer import ( - process_pre_job_server_download, - upload -) +# from ProcessorServer import ( +# process_pre_job_server_download, +# upload +# ) from ProcessorUtils import ( add_filters_to_sublogger, - query_past_successes + # query_past_successes ) # TODO: Replace subprocess scp and ssh commands with paramiko.SSHClient() instance diff --git a/coordinator/ProcessorDeposition.py b/coordinator/ProcessorDeposition.py index 63adc080daaf2224aa5983929aeedb183c2738bb..e84d5fcb8fdc9144fbcb085a90eba201e4da04a9 100644 --- a/coordinator/ProcessorDeposition.py +++ b/coordinator/ProcessorDeposition.py @@ -38,10 +38,14 @@ class ProcessorDeposition(Processor): def __init__(self) -> None: super().__init__() + + """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ + + + def set_component_logger(self): logger = logging.getLogger('Processor.Deposition') add_filters_to_sublogger(logger) - """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ def process_in_job_dep(self, jobPath, status, config, component): self.logger.info('started process_in_job_dep()') @@ -185,3 +189,4 @@ class ProcessorDeposition(Processor): if __name__ == '__main__': processor = ProcessorDeposition() processor.run_processor("Deposition") + diff --git a/coordinator/ProcessorEnvironment.py b/coordinator/ProcessorEnvironment.py index a2a34c64d9f5dacbb94c9c8a99b1d16d985f7f22..c927e41aa0a818f55e201d8941c2fc2d19d64e44 100644 --- a/coordinator/ProcessorEnvironment.py +++ b/coordinator/ProcessorEnvironment.py @@ -22,7 +22,11 @@ from ProcessorUtils import ( class ProcessorEnvironment(Processor): - + + def set_component_logger(self): + logger = logging.getLogger('Processor.Environment') + add_filters_to_sublogger(logger) + def process_pre_job(self, args): # return process_pre_job_server_download(args) return True @@ -37,8 +41,6 @@ class ProcessorEnvironment(Processor): def __init__(self) -> None: super().__init__() - logger = logging.getLogger('Processor.Environment') - add_filters_to_sublogger(logger) def process_in_job_env2_0(self, jobPath,status,config,component): '''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.''' diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index 23584097bbdc6ad3d6575677a7ed4e043732c7fa..b26a20dca20bfc83574bad46c1bbb8410b0ff891 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -36,6 +36,11 @@ from ProcessorUtils import ( ) class ProcessorEpidemiology(Processor): + + def set_component_logger(self): + logger = logging.getLogger('Processor.Epi') + add_filters_to_sublogger(logger) + def process_pre_job(self, args) -> bool: return self.process_pre_job_epi(args) @@ -50,8 +55,6 @@ class ProcessorEpidemiology(Processor): def __init__(self) -> None: super().__init__() - logger = logging.getLogger('Processor.Epi') - add_filters_to_sublogger(logger) def process_pre_job_epi(self, input_args: dict): '''Returns a boolean as to whether the job is ready for full processing.''' diff --git a/coordinator/ProcessorScraper.py b/coordinator/ProcessorScraper.py index 3d98905a2c51edd1cd6acb2937e402a8fea745ca..1b2d9336a40dcebcebb8afe9ab4dfdfa6f43314e 100644 --- a/coordinator/ProcessorScraper.py +++ b/coordinator/ProcessorScraper.py @@ -34,6 +34,11 @@ class ProcessorScraper(Processor): """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ + + def set_component_logger(self): + logger = logging.getLogger('Processor.Scraper') + add_filters_to_sublogger(logger) + def process_pre_job(self, args): return True @@ -45,11 +50,9 @@ class ProcessorScraper(Processor): def __init__(self) -> None: super().__init__() - logger = logging.getLogger('Processor.Scraper') - add_filters_to_sublogger(logger) - """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ + """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ # date format conforms to format used in SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv # # ODK v1.11.2: diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py index a215b992da1616d768ce179ef2cb120689b61fdf..2bfffc66d9db9b164afee1bd63ec1b8b070c4666 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/ProcessorSurveys.py @@ -55,8 +55,13 @@ from ProcessorUtils import ( class ProcessorSurveys(Processor): + def set_component_logger(self): + logger = logging.getLogger('Processor.Surveys') + add_filters_to_sublogger(logger) + def process_pre_job(self, args): - return self.process_pre_job_survey(args) + # return self.process_pre_job_survey(args) + return True def process_in_job(self, jobPath, status, configjson, component) -> object: @@ -69,8 +74,6 @@ class ProcessorSurveys(Processor): def __init__(self) -> None: super().__init__() - logger = logging.getLogger('Processor.Surveys') - add_filters_to_sublogger(logger) self.GET_FORM_AS_CSV_DICT = { 'ODK': get_ODK_form_as_csv, @@ -83,11 +86,11 @@ class ProcessorSurveys(Processor): 'newODK2' : get_newODK2_form_as_csv, } - def process_pre_job_survey(self, input_args): - '''Returns a boolean as to whether the job is ready for full processing.''' - self.logger.info('started process_pre_job_survey(), nothing to do') - - return True + # def process_pre_job_survey(self, input_args): + # '''Returns a boolean as to whether the job is ready for full processing.''' + # self.logger.info('started process_pre_job_survey(), nothing to do') + # + # return True def process_in_job_survey(self, jobPath,status,config,component): self.logger.info('started process_in_job_survey()') diff --git a/tests/integration/partial/integration_test_utils.py b/tests/integration/partial/integration_test_utils.py index c52a98a3bfefa7fb723d61356085b2a3c4533d35..0bb1966b4116746a9acf6e3c222b5a504a6f7702 100644 --- a/tests/integration/partial/integration_test_utils.py +++ b/tests/integration/partial/integration_test_utils.py @@ -10,6 +10,7 @@ from zipfile import ZipFile from HTMLTestRunner import HTMLTestRunner +import ProcessorUtils from Processor import Processor @@ -189,20 +190,31 @@ class IntegrationTestUtils: args_dict: dict = {} + config_paths = [IntegrationTestUtils.TEMP_CONFIG_FILE_NAME] + # note, possible to override these values in the kwargs loop below args_dict['live'] = False args_dict['noupload'] = True args_dict['start_date'] = start_date args_dict['component'] = component - args_dict['config_paths'] = [IntegrationTestUtils.TEMP_CONFIG_FILE_NAME] + args_dict['config_paths'] = config_paths args_dict['log_level'] = 'info' args_dict['clearup'] = True for key, value in kwargs.items(): args_dict[key] = value - log_level = args_dict['log_level'] - processor.set_log_level(log_level) + # universal_config: dict = processor.build_universal_config(config_paths, component) + # workspacePath = universal_config['WorkspacePathout'] + # + # job_path: str = f'{workspacePath}{ProcessorUtils.short_name[component]}_{start_date}' + # logPath = f"{job_path}/log.txt" + # + # is_live: bool = args_dict['live'] + # processor.setup_logging(logPath, is_live) + # + # log_level = args_dict['log_level'] + # processor.set_log_level(log_level) try: processor.run_process(args_dict)