diff --git a/coordinator/Processor.py b/coordinator/Processor.py index 09f1eb891e6ae8b01092c4e085502e7fb2b14d4d..25f379920b0012650e7e01361749de6c7326fd96 100755 --- a/coordinator/Processor.py +++ b/coordinator/Processor.py @@ -49,6 +49,11 @@ from ProcessorUtils import ( short_name ) +""" +Default logger - will be overridden by the Processor.setup_logging() method when called from the command line +""" +logger = logging.getLogger(__name__) + class Processor: @@ -72,8 +77,6 @@ class Processor: def setup_logging(self, job_log_file_path: str, is_live: bool): # initialise default values for configuration - script_name = 'Processor' - # get the path to this script script_path = os.path.dirname(__file__) + '/' @@ -83,10 +86,6 @@ class Processor: ##todo how does this work when there are several processors running at once, i.o. errors? log_path_project = f"{coordinator_path}logs/log.txt" - # make the job_log_file_path parent dirs if it does not exist - job_log_file_parent_dirs = os.path.dirname(job_log_file_path) - Path(job_log_file_parent_dirs).mkdir(parents=True, exist_ok=True) - # job-specific log file will be written here until a job directory exits, when it will be moved there # self.log_path_default = f"{coordinator_path}logs/log_{self.nowString}.txt" @@ -103,110 +102,26 @@ class Processor: for required_key in required_keys: assert required_key in gmail_config - # load config from python dictionary (could be loaded from json file) + arg_string = ' '.join(sys.argv) + + # load json file + json_file_path = "/home/lb584/git/ews-coordinator/configs/logger/log_config.json" + with open(json_file_path, 'r') as f: + log_config_dict = json.load(f) + # + log_config_dict['formatters']['detailed']['format'] = log_config_dict['formatters']['detailed']['format'].replace("{arg_string}", arg_string) + + log_config_dict['filters']['mask_passwords']['()'] = PasswordODKFilter + + log_config_dict['handlers']['handler_rot_file']['filename'] = log_path_project + # TODO: smtp handler can only use tls, but ssl is more secure. Look into defining/writing a suitable smtp handler - logConfigDict = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'simple': { - 'format': '%(name)s : %(levelname)s - %(message)s' - }, - 'detailed': { - 'format': f""" - For command: - {' '.join(sys.argv)} - - %(levelname)s in %(name)s encountered at %(asctime)s: - - %(message)s - - Resolve this error and restart processing. - - """, - 'datefmt': '%Y-%m-%d %H:%M:%S' - } - }, - 'filters': { - 'mask_passwords': { - '()': PasswordODKFilter - } - }, - 'handlers': { - - # logging for project - 'handler_rot_file': { - 'class': 'logging.handlers.TimedRotatingFileHandler', - 'level': 'INFO', - 'formatter': 'simple', - 'filters': ['mask_passwords'], - 'filename': log_path_project, - # time of day on given day - 'when': 'W2', # rotate file every Wednesday - 'atTime': datetime.time(1, 0, 0), # at 1am local time - 'backupCount': 12, - }, - - # logging for job - 'handler_file': { - 'class': 'logging.FileHandler', - 'level': 'INFO', - 'formatter': 'simple', - 'filters': ['mask_passwords'], - 'filename': job_log_file_path, - 'mode': 'a', # 'a' for append - }, - # logging for job to stderr (useful for Airflow to catch in its logs) - 'handler_stderr': { - 'class': 'logging.StreamHandler', - 'level': 'INFO', - 'formatter': 'simple', - 'filters': ['mask_passwords'] - }, - # to email errors to maintainers - 'handler_buffered_email': { - 'class': 'BufferingSMTPHandler.BufferingSMTPHandler', - 'level': 'ERROR', - 'server': (gmail_config['host'], gmail_config['port']), - # host, port. 465 fsor SSL, 587 for tls - 'credentials': (gmail_config['user'], gmail_config['pass']), - 'fromaddr': gmail_config['user'], - 'toaddrs': gmail_config['toaddrs'], - 'subject': 'ERROR in EWS Processor', - 'formatter': 'detailed', - 'filters': ['mask_passwords'], - 'capacity': 100 - }, - }, - 'loggers': { - # this is activated when this script is imported - # i.e. with logging.getLogger('Process.') - script_name: { - 'level': 'INFO', - 'handlers': ['handler_rot_file', 'handler_file', 'handler_buffered_email', 'handler_stderr'], - 'propagate': True, - }, - # this is activated when this script is called on the command line - # or from a bash script - # i.e. with logging.getLogger(__name__) when name == '__main__' - '__main__': { - 'level': 'INFO', - 'handlers': ['handler_rot_file', 'handler_file', 'handler_buffered_email', 'handler_stderr'], - 'propagate': True, - } - } - } - - logging.config.dictConfig(logConfigDict) - - print(__name__) - # create a logger named according to how the file is called - # logger = logging.getLogger(__name__) - self.logger = logging.getLogger(script_name) - if not is_live: - # remove the log handler that would send emails - self.logger.handlers = [h for h in self.logger.handlers if - not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] + log_config_dict['handlers']['handler_buffered_email']['server'] = (gmail_config['host'], gmail_config['port']) + log_config_dict['handlers']['handler_buffered_email']['credentials'] = (gmail_config['user'], gmail_config['pass']) + log_config_dict['handlers']['handler_buffered_email']['fromaddr'] = gmail_config['user'] + log_config_dict['handlers']['handler_buffered_email']['toaddrs'] = gmail_config['toaddrs'] + + logging.config.dictConfig(log_config_dict) self.loglevels = {'debug': logging.DEBUG, 'info': logging.INFO, @@ -215,49 +130,22 @@ class Processor: 'critical': logging.CRITICAL, } + def set_job_file_logger(self, job_log_file_path: str): + job_file_handler = logging.FileHandler(job_log_file_path, mode = 'a') + job_file_handler.setLevel(logging.INFO) + job_file_handler.setFormatter(logging.Formatter('"%(name)s : %(levelname)s - %(message)s"')) + job_file_handler.addFilter(PasswordODKFilter()) + job_file_handler.set_name('handler_file') + logger.addHandler(job_file_handler) + # todo we can get rid of this once we make all ews packages share a common namespace + logging.getLogger("root").addHandler(job_file_handler) - # def move_default_logfile_handler(self, dstPathName, srcPathName = None, FileHandlerName = 'handler_file', ): - # '''For on-the-fly move of logging from default file to another. Copies the - # contents of the source log file to destination, switches file handler in - # logger, then removes source log file.''' - # - # if srcPathName is None: - # srcPathName = self.log_path_default - # - # self.logger.info(f"Moving logfile location from:\n{srcPathName}\nto:\n{dstPathName}") - # - # # copy old log file to new filename - # srcPath = Path(srcPathName) - # dstPath = Path(dstPathName) - # assert srcPath.exists() - # assert dstPath.parent.is_dir() - # - # oldFileHandler = [h for h in self.logger.handlers if h.name == FileHandlerName][0] - # oldFormatter = oldFileHandler.formatter - # - # # define new file handler - # newfilehandler = logging.FileHandler(dstPath, mode = oldFileHandler.mode) - # newfilehandler.setLevel(oldFileHandler.level) - # newfilehandler.setFormatter(oldFormatter) - # - # shutil.copyfile(srcPath, dstPath) - # - # # add handler for destination file - # self.logger.info('Adding new logging file handler to destination path') - # - # self.logger.addHandler(newfilehandler) - # - # # remove handler for source file - # self.logger.info('Stopping write to old file handler') - # self.logger.removeHandler(oldFileHandler) - # oldFileHandler.close() - # self.logger.info('Successfully stopped write to old file handler') - # - # # delete old log file - # self.logger.info('Deleting old log file, since all content available in new log file stream') - # os.rename(srcPathName, srcPathName + 'removed') - # - # return + + def set_is_logging_live(self, is_live: bool): + if not is_live: + # remove the log handler that would send emails + logger.handlers = [h for h in logger.handlers if + not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] def parse_and_check_args(self) -> dict: @@ -365,8 +253,8 @@ class Processor: def set_log_level(self, log_level: str): new_log_level_code = self.loglevels[log_level] - # modify log level of all self.loggers - self.logger.info(f"logging level being changed to {new_log_level_code} ({log_level}) because of command-line option") + # modify log level of all loggers + logger.info(f"logging level being changed to {new_log_level_code} ({log_level}) because of command-line option") loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] for logger_i in loggers: logger_i.setLevel(new_log_level_code) @@ -397,7 +285,7 @@ class Processor: except: - self.logger.exception(f"Failure in opening or checking config {configFile}") + logger.exception(f"Failure in opening or checking config {configFile}") endScript(premature = True) for key in keys: @@ -414,7 +302,7 @@ class Processor: for key in keys: if len(universal_config[key]) > 1: - self.logger.error(f"Config files point to multiple {key} but this script can only handle one.") + logger.error(f"Config files point to multiple {key} but this script can only handle one.") endScript(premature = True) universal_config[key] = universal_config[key].pop() @@ -439,6 +327,7 @@ class Processor: def run_process(self, args: dict) -> None: + # check initial state of each config file, and gather terms that must apply # across all provided configs @@ -459,15 +348,17 @@ class Processor: log_file_path = f"{job_path}/log.txt" is_live: bool = args["live"] + logger.info(f"this is before the path is set {log_file_path}") self.setup_logging(log_file_path, is_live) + self.set_job_file_logger(log_file_path) + self.set_is_logging_live(is_live) self.set_log_level(args['log_level']) - self.set_component_logger() - self.logger.info("==========") - self.logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") - self.logger.info(f"Universal config is\n{json.dumps(universal_config, indent = 2)}") - self.logger.info(f"Job path will be {job_path}") + logger.info("==========") + logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") + logger.info(f"Universal config is\n{json.dumps(universal_config, indent = 2)}") + logger.info(f"Job path will be {job_path}") workspacePath = universal_config['WorkspacePathout'] # process_pre_job = getattr(ProcessorComponents, universal_config['ProcessPreJob']) @@ -486,7 +377,7 @@ class Processor: ready = self.process_pre_job(args) # # if not ready: - # self.logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") + # logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") # lock job directory status: jobStatus @@ -495,17 +386,17 @@ class Processor: # lawrence comment in/out # check for a status file in job directory if status.had_initial_status: - self.logger.info(f"Job path already exists and has status {status.status}") + logger.info(f"Job path already exists and has status {status.status}") endScript(premature = status.status not in ['SUCCESS', 'INPROGRESS']) - self.logger.info(f"Current status of job directory is {status.status}") + logger.info(f"Current status of job directory is {status.status}") # self.move_default_logfile_handler(dstPathName = logPathJob) # make a record if process_pre_job failed if not ready: - self.logger.error( + logger.error( f"Process_pre_job raised an error so making a record in the job file. For details, see earlier warnings log") status.reset('ERROR') @@ -518,17 +409,17 @@ class Processor: # successful job paths_to_clear = [] - self.logger.info('Starting to work on each configuration') + logger.info('Starting to work on each configuration') for configIndex, configtemplate in enumerate(config_paths): config_paths_length: int = len(config_paths) - self.logger.info(f'Working on config {configIndex + 1} of {config_paths_length}') + logger.info(f'Working on config {configIndex + 1} of {config_paths_length}') try: configjson = open_and_check_config(configtemplate) except: - self.logger.exception(f"Failure in opening or checking config {configtemplate}") + logger.exception(f"Failure in opening or checking config {configtemplate}") # TODO: This case should test flagdir.jobStatus.__exit__() raise # endJob('ERROR',premature=True) @@ -552,7 +443,7 @@ class Processor: try: proc_out: dict = self.process_in_job(job_path, status, configjson, component) except: - self.logger.exception(f"Error in process_in_job") + logger.exception(f"Error in process_in_job") status.reset('ERROR') endJob(status, premature = True) @@ -584,11 +475,11 @@ class Processor: try: ews_plotting_outputs = self.process_post_job(job_path, configjson) except: - self.logger.exception(f"Error in {proc_description}()") + logger.exception(f"Error in {proc_description}()") status.reset('ERROR') endJob(status, premature = True) - self.logger.info('Finished with EWS-Plotting, appending images to list for transfer') + logger.info('Finished with EWS-Plotting, appending images to list for transfer') # if ews_plotting_outputs: # append_item_to_list( @@ -597,7 +488,7 @@ class Processor: # proc_description, # status) - self.logger.info(f'Finished with config {configIndex + 1} of {config_paths_length}') + logger.info(f'Finished with config {configIndex + 1} of {config_paths_length}') # # send results to remote server # @@ -608,18 +499,18 @@ class Processor: # status.reset('WARNING') # # except: - # self.logger.exception('Failed to upload files to remote server') + # logger.exception('Failed to upload files to remote server') # status.reset('ERROR') # endJob(status, premature = True) # # # check if there is a second location on willow to provide results # if 'ServerPathExtra' in configjson[component]: # - # self.logger.info('There is an extra path to send results to:') + # logger.info('There is an extra path to send results to:') # # extra_path = configjson[component]['ServerPathExtra'] # - # self.logger.info(extra_path) + # logger.info(extra_path) # # universal_config_extra = universal_config.copy() # universal_config_extra['ServerPath'] = extra_path @@ -630,22 +521,22 @@ class Processor: # status.reset('WARNING') # # except: - # self.logger.exception('Failed to upload files to extra directory on remote server') + # logger.exception('Failed to upload files to extra directory on remote server') # status.reset('ERROR') # endJob(status, premature = True) # # else: - # self.logger.info('Because noupload argument was present, not sending results to remote server') + # logger.info('Because noupload argument was present, not sending results to remote server') status.reset('SUCCESS') if status.is_success() & (clearup is True): - self.logger.info('Clearing up') + logger.info('Clearing up') clearup_dest_dir = f"{workspacePath}/clearup/{short_name[component]}_{start_date}/" Path(clearup_dest_dir).mkdir(parents = True, exist_ok = True) - self.logger.info(f"While developing, moving directories to this directory : {clearup_dest_dir}") + logger.info(f"While developing, moving directories to this directory : {clearup_dest_dir}") clear_up(paths_to_clear, clearup_dest = clearup_dest_dir) @@ -682,8 +573,8 @@ class Processor: except SystemExit as e: print("caught with code " + str(e.code)) - if self.logger: - self.logger.info('run_process() exited') + if logger: + logger.info('run_process() exited') sys.exit(e.code) except: raise Exception('Uncaught exception in run_Process:') diff --git a/coordinator/ProcessorDeposition.py b/coordinator/ProcessorDeposition.py index 68fe833a65e1321bf95db569b5e8b811fe926e7c..a3569dd8e28aef90675c978a6e28fe4d0067a75d 100644 --- a/coordinator/ProcessorDeposition.py +++ b/coordinator/ProcessorDeposition.py @@ -18,6 +18,7 @@ from ProcessorUtils import ( ) from ews_postprocessing.deposition.deposition_post_processor import DepositionPostProcessor +logger = logging.getLogger(__name__) class ProcessorDeposition(Processor): @@ -43,23 +44,24 @@ class ProcessorDeposition(Processor): def set_component_logger(self): - logger = logging.getLogger('Processor.Deposition') - add_filters_to_sublogger(logger) + # logger = logging.getLogger('Processor.Deposition') + # add_filters_to_sublogger(logger) + pass def process_in_job_dep(self, jobPath, status, config, component): - self.logger.info('started process_in_job_dep()') + logger.info('started process_in_job_dep()') file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) file_name = Template(config[component]['InputFileTemplate']).substitute(**config) - self.logger.info(f"Expecting to work with {file_name}") + logger.info(f"Expecting to work with {file_name}") if os.path.exists(f"{jobPath}/{file_name}"): - self.logger.info('Directory already exists in job directory, so nothing to do here') + logger.info('Directory already exists in job directory, so nothing to do here') return - self.logger.info('Copying file from remote server to job directory') + logger.info('Copying file from remote server to job directory') # TODO: perform ssh file transfer in python instead of subprocess server_name: str = config['ServerName'] @@ -73,7 +75,7 @@ class ProcessorDeposition(Processor): description_long = 'scp from server to job directory' subprocess_and_log(cmd_scp, description_short, description_long) - self.logger.info('untarring the input file') + logger.info('untarring the input file') # TODO: untar file in python (with tarfile module) instead of subprocess cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath] @@ -85,7 +87,7 @@ class ProcessorDeposition(Processor): # 132 files of NAME .txt timesteps and one summary png file # if len(glob(f"{jobPath}/{file_name}/deposition_srcs_allregions_C1_T*.txt")) != 56: # msg = f"Unexpect number of deposition .txt files in input tar file. Expected 56." - # self.logger.error(msg) + # logger.error(msg) # raise RuntimeError(msg) # basic check that contents are as expected (56 timepoints in the file) @@ -96,7 +98,7 @@ class ProcessorDeposition(Processor): timepoint_count = coord.shape[0] if timepoint_count != 56: msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}" - self.logger.error(msg) + logger.error(msg) raise RuntimeError(msg) proc_out = {} @@ -111,7 +113,7 @@ class ProcessorDeposition(Processor): def process_EWS_plotting_dep(self, jobPath, config) -> [str]: '''Returns a list of output files for transfer.''' - self.logger.info('started process_EWS_plotting_dep()') + logger.info('started process_EWS_plotting_dep()') # initialise environment regions = config['SubRegionNames'] @@ -143,7 +145,7 @@ class ProcessorDeposition(Processor): # Note that this runs all disease types available - self.logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{name_extraction_config}\n{run_config}\n{run_config_norm}\n{chart_config}") + logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{name_extraction_config}\n{run_config}\n{run_config_norm}\n{chart_config}") depo_processor = DepositionPostProcessor() depo_processor.set_param_config_files(sys_config_file_arg = sys_config, @@ -177,7 +179,7 @@ class ProcessorDeposition(Processor): # check there is some output from EWS-plotting if not ews_plotting_output_globs: - self.logger.error('EWS-Plotting did not produce any output') + logger.error('EWS-Plotting did not produce any output') raise RuntimeError # provide list for transfer @@ -187,6 +189,7 @@ class ProcessorDeposition(Processor): if __name__ == '__main__': + print(__name__) processor = ProcessorDeposition() processor.run_processor("Deposition") diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py index 2cda25ec07a00ed041be7ad8ed395e76048296a6..a8b563d6dbfbe76104fd249a7ebd1bb2e62f2ea3 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/ProcessorSurveys.py @@ -36,7 +36,7 @@ from shutil import copyfile from pandas import read_csv, concat from Processor import Processor -from source_gen.clustering import run_case +from source_gen.clustering import run_case, logit from ProcessorSurveysODK import get_ODK_form_as_csv from ProcessorSurveysODKSA import get_ODK_SA_form_as_csv @@ -56,8 +56,9 @@ from ProcessorUtils import ( class ProcessorSurveys(Processor): def set_component_logger(self): - logger = logging.getLogger('Processor.Surveys') - add_filters_to_sublogger(logger) + # logger = logging.getLogger('Processor.Surveys') + # add_filters_to_sublogger(logger) + pass def process_pre_job(self, args): # return self.process_pre_job_survey(args) @@ -93,6 +94,8 @@ class ProcessorSurveys(Processor): # return True def process_in_job_survey(self, jobPath,status,config,component): + logit() + self.logger.info('started process_in_job_survey()') self.logger.debug('Performing download(s) from ODK server') @@ -429,5 +432,6 @@ class ProcessorSurveys(Processor): if __name__ == '__main__': + print(__name__) processor = ProcessorSurveys() processor.run_processor("Survey")