diff --git a/configs/logger/README.txt b/configs/logger/README.txt new file mode 100644 index 0000000000000000000000000000000000000000..6e28be0ee1f6d81b47c5c78d1bf43e21e3e90f83 --- /dev/null +++ b/configs/logger/README.txt @@ -0,0 +1,2 @@ +make a copy of the template file and add an environment variable called 'LOG_CONFIG' at runtime. You can add a file +path for your project-level logger or leave blank for a default location \ No newline at end of file diff --git a/configs/logger/template_log_config.json b/configs/logger/template_log_config.json new file mode 100644 index 0000000000000000000000000000000000000000..47093c4070a3765ae49e5132be10de48afd80580 --- /dev/null +++ b/configs/logger/template_log_config.json @@ -0,0 +1,89 @@ +{ + "version": 1, + "disable_existing_loggers": false, + "formatters": { + "simple": { + "format": "%(name)s : %(levelname)s - %(message)s" + }, + "detailed": { + "format": "For command:\n{arg_string}\n\n%(levelname)s in %(name)s encountered at %(asctime)s:\n\n%(message)s\n\nResolve this error and restart processing.", + "datefmt": "%Y-%m-%d %H:%M:%S" + } + }, + "filters": { + "mask_passwords": { + "()": "NOT_SET" + } + }, + "handlers": { + "handler_project": { + "class": "logging.handlers.TimedRotatingFileHandler", + "formatter": "simple", + "level": "INFO", + "filters": [ + "mask_passwords" + ], + "filename": "", + "when": "W2", + "backupCount": 12 + }, + "handler_job": { + "class": "logging.FileHandler", + "formatter": "simple", + "level": "INFO", + "filters": [ + "mask_passwords" + ], + "filename": "" + }, + "handler_stderr": { + "class": "logging.StreamHandler", + "formatter": "simple", + "level": "INFO", + "filters": [ + "mask_passwords" + ] + }, + "handler_buffered_email": { + "class": "BufferingSMTPHandler.BufferingSMTPHandler", + "level": "ERROR", + "server": "NOT_SET", + "credentials": "NOT_SET", + "fromaddr": "NOT_SET", + "toaddrs": "NOT_SET", + "subject": "ERROR in EWS Processor", + "formatter": "detailed", + "filters": [ + "mask_passwords" + ], + "capacity": 100 + } + }, + "loggers": { + "__main__": { + "level": "INFO", + "handlers": ["handler_project","handler_job","handler_buffered_email","handler_stderr"], + "propagate": false + }, + "coordinator": { + "level": "INFO", + "handlers": ["handler_project","handler_job","handler_buffered_email","handler_stderr"], + "propagate": false + }, + "source_gen": { + "level": "INFO", + "handlers": ["handler_project","handler_job","handler_buffered_email","handler_stderr"], + "propagate": false + }, + "ews_postprocessing": { + "level": "INFO", + "handlers": ["handler_project","handler_job","handler_buffered_email","handler_stderr"], + "propagate": false + }, + "plotting": { + "level": "INFO", + "handlers": ["handler_project","handler_job","handler_buffered_email","handler_stderr"], + "propagate": false + } + } +} \ No newline at end of file diff --git a/coordinator/ProcessorAdvisory.py b/coordinator/ProcessorAdvisory.py index 75f033df07c25c1d35476741ea86de2b7f7aa744..2d4ed9694ffbf4da8efa2e048b33ee2a80b0e1ba 100644 --- a/coordinator/ProcessorAdvisory.py +++ b/coordinator/ProcessorAdvisory.py @@ -7,7 +7,7 @@ from typing import List # gitlab projects # TODO: Package these projects so they are robust for importing from AdvisoryBuilder import DataGatherer # created by jws52 -from Processor import Processor +from processor_base import ProcessorBase from ProcessorUtils import ( add_filters_to_sublogger, @@ -17,7 +17,7 @@ from ProcessorUtils import ( ) -class ProcessorAdvisory(Processor): +class ProcessorAdvisory(ProcessorBase): def set_component_logger(self): logger = logging.getLogger('Processor.Advisory') diff --git a/coordinator/ProcessorComponents.py b/coordinator/ProcessorComponents.py index aa8b519d4c4dd1d7e9ee381431387f83aac95fb7..268a6bd2dc27eb3a437d4c701840737ce3c58b47 100644 --- a/coordinator/ProcessorComponents.py +++ b/coordinator/ProcessorComponents.py @@ -24,8 +24,7 @@ from ProcessorUtils import ( # TODO: Replace subprocess scp and ssh commands with paramiko.SSHClient() instance -logger = logging.getLogger('Processor.Components') -add_filters_to_sublogger(logger) +logger = logging.getLogger(__name__) # get path to this script script_path = os.path.dirname(__file__)+'/' diff --git a/coordinator/ProcessorDeposition.py b/coordinator/ProcessorDeposition.py index a3569dd8e28aef90675c978a6e28fe4d0067a75d..3e47683dd6f5eb46e1722c7ee0d832c63e0c41a5 100644 --- a/coordinator/ProcessorDeposition.py +++ b/coordinator/ProcessorDeposition.py @@ -10,17 +10,18 @@ from string import Template import iris from iris.cube import CubeList -from Processor import Processor from ProcessorUtils import ( get_only_existing_globs, subprocess_and_log, add_filters_to_sublogger, ) +from coordinator import ProcessorUtils +from coordinator.processor_base import ProcessorBase from ews_postprocessing.deposition.deposition_post_processor import DepositionPostProcessor logger = logging.getLogger(__name__) -class ProcessorDeposition(Processor): +class ProcessorDeposition(ProcessorBase): """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ @@ -39,6 +40,7 @@ class ProcessorDeposition(Processor): def __init__(self) -> None: super().__init__() + print(__name__) """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ @@ -190,6 +192,7 @@ class ProcessorDeposition(Processor): if __name__ == '__main__': print(__name__) + print(ProcessorUtils.__name__) processor = ProcessorDeposition() processor.run_processor("Deposition") diff --git a/coordinator/ProcessorEnvironment.py b/coordinator/ProcessorEnvironment.py index 041ce5dc8841ac14eb370c431cba77f6f75f21cf..1ce3a203d09fd0b896760528c05ef3f92257e78a 100644 --- a/coordinator/ProcessorEnvironment.py +++ b/coordinator/ProcessorEnvironment.py @@ -7,7 +7,7 @@ import logging from pathlib import Path import os -from Processor import Processor +from processor_base import ProcessorBase from ProcessorServer import ( get_data_from_server, ) @@ -20,8 +20,9 @@ from ProcessorUtils import ( short_name, add_filters_to_sublogger ) +logger = logging.getLogger(__name__) -class ProcessorEnvironment(Processor): +class ProcessorEnvironment(ProcessorBase): def set_component_logger(self): logger = logging.getLogger('Processor.Environment') @@ -45,7 +46,7 @@ class ProcessorEnvironment(Processor): def process_in_job_env2_0(self, jobPath,status,config,component) -> dict: '''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.''' - self.logger.info('started process_in_job_env2_0()') + logger.info('started process_in_job_env2_0()') # Initialise output variable proc_out = {} @@ -54,7 +55,7 @@ class ProcessorEnvironment(Processor): # Processing files available for clearing proc_out['clearup'] = [] - self.logger.info('Copying file from remote server to job directory') + logger.info('Copying file from remote server to job directory') data_result = get_data_from_server(jobPath,config,component) @@ -65,17 +66,17 @@ class ProcessorEnvironment(Processor): region = config['RegionName'] - self.logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear") + logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear") pipeline_config = config["Environment"] try: #todo lawrence comment this back to original (extracted=False) esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False) except: - self.logger.exception(f"Some failure when running EnvSuitPipeline.py") + logger.exception(f"Some failure when running EnvSuitPipeline.py") raise - self.logger.info('Finished running environmental suitability 2.0') + logger.info('Finished running environmental suitability 2.0') # TODO: Check that the output appears as expected @@ -94,13 +95,13 @@ class ProcessorEnvironment(Processor): dir_dst = f"{jobPath}/processed/" - self.logger.info(f"Copying from {dir_src}") + logger.info(f"Copying from {dir_src}") - self.logger.info(f"to {dir_dst}") + logger.info(f"to {dir_dst}") copy_tree(dir_src,dir_dst) - self.logger.info('Copying complete') + logger.info('Copying complete') proc_out = {} # Output files available for upload @@ -126,7 +127,7 @@ class ProcessorEnvironment(Processor): '''Configures the plotting arguments and calls EWS-plotting as a python module. Returns a list of output files for transfer.''' - self.logger.info('started process_EWS_plotting_env2_0()') + logger.info('started process_EWS_plotting_env2_0()') main_region = config['RegionName'] @@ -151,7 +152,7 @@ class ProcessorEnvironment(Processor): # Note that this runs all disease types available - self.logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") + logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") env_suit_processor = EnvSuitPostProcessor() env_suit_processor.set_param_config_files(sys_params_file_arg = sys_config, @@ -190,7 +191,7 @@ class ProcessorEnvironment(Processor): # check there is some output from EWS-plotting if not ews_plotting_output_globs: - self.logger.error('EWS-Plotting did not produce any output') + logger.error('EWS-Plotting did not produce any output') raise RuntimeError # provide list for transfer diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index d0476bf5250305343a198d5fe211b9d8ede489d6..6fa843d41e80cfc77323e0012d312cbc5f026e2f 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -22,7 +22,7 @@ from EpiModel import ( # created by rs481 plotRaster ) from EpiModel.EpiPrep import prep -from Processor import Processor +from processor_base import ProcessorBase from ews_postprocessing.epi.epi_post_processor import EPIPostPostProcessor from ProcessorUtils import ( @@ -35,7 +35,7 @@ from ProcessorUtils import ( disease_latin_name_dict ) -class ProcessorEpidemiology(Processor): +class ProcessorEpidemiology(ProcessorBase): def set_component_logger(self): logger = logging.getLogger('Processor.Epi') diff --git a/coordinator/ProcessorScraper.py b/coordinator/ProcessorScraper.py index 1b2d9336a40dcebcebb8afe9ab4dfdfa6f43314e..5e89c07c56d5ef72636e62275508cd5f21727296 100644 --- a/coordinator/ProcessorScraper.py +++ b/coordinator/ProcessorScraper.py @@ -22,7 +22,7 @@ import certifi from numpy import where from pandas import concat, DataFrame, read_csv, Series, set_option -from Processor import Processor +from processor_base import ProcessorBase # gitlab projects # TODO: Package these projects so they are robust for importing @@ -30,7 +30,7 @@ from flagdir import jobStatus # created by jws52 from ProcessorUtils import add_filters_to_sublogger -class ProcessorScraper(Processor): +class ProcessorScraper(ProcessorBase): """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ diff --git a/coordinator/ProcessorServer.py b/coordinator/ProcessorServer.py index 2e13b03f531df4a07e753a695be0066052fa0d06..82a3b39e5149f764a497ba8571cb08091ee91e1f 100644 --- a/coordinator/ProcessorServer.py +++ b/coordinator/ProcessorServer.py @@ -20,8 +20,7 @@ from ProcessorUtils import ( subprocess_and_log ) -logger = logging.getLogger('Processor.Server') -add_filters_to_sublogger(logger) +logger = logging.getLogger(__name__) # def process_pre_job_server_download(input_args: dict): # '''This is set up for environmental suitability v2.0 and deposition. diff --git a/coordinator/ProcessorSurveyUtils.py b/coordinator/ProcessorSurveyUtils.py index 92be9e75736307bc22d6d3ba83bbb42715ed0478..4b5759bda06e712c1a85ee1271ec7a98264b8d29 100644 --- a/coordinator/ProcessorSurveyUtils.py +++ b/coordinator/ProcessorSurveyUtils.py @@ -7,12 +7,7 @@ import re from pandas import Series, DataFrame, concat -from ProcessorUtils import ( - add_filters_to_sublogger, -) - -logger = logging.getLogger('Processor.Surveys.Utils') -add_filters_to_sublogger(logger) +logger = logging.getLogger(__name__) #parse columns into ODK format def parse_location_str(location_str): diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py index a8b563d6dbfbe76104fd249a7ebd1bb2e62f2ea3..558fb7768dd68b5a21e81e2962f7d5a6e23e8333 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/ProcessorSurveys.py @@ -24,36 +24,31 @@ Groups can be ignored by defining "GroupsToIgnore" in the config. import csv import datetime -from glob import glob import json import logging +from glob import glob from pathlib import Path -import os +from shutil import copyfile from numpy import all as np_all from numpy import any as np_any -from shutil import copyfile from pandas import read_csv, concat -from Processor import Processor -from source_gen.clustering import run_case, logit - from ProcessorSurveysODK import get_ODK_form_as_csv from ProcessorSurveysODKSA import get_ODK_SA_form_as_csv from ProcessorSurveysODKv2 import get_ODKv2_form_as_csv -from ProcessorSurveysnewODK import get_newODK_form_as_csv -from ProcessorSurveysnewODK2 import get_newODK2_form_as_csv -from ProcessorSurveyskobotoolbox import get_kobotoolbox_form_as_csv from ProcessorSurveysWRSIS import get_WRSIS_form_as_csv from ProcessorSurveysWRT import get_WRT_form_as_csv -from ProcessorUtils import ( - subprocess_and_log, - endJob, - add_filters_to_sublogger, -) +from ProcessorSurveyskobotoolbox import get_kobotoolbox_form_as_csv +from ProcessorSurveysnewODK import get_newODK_form_as_csv +from ProcessorSurveysnewODK2 import get_newODK2_form_as_csv +from processor_base import ProcessorBase +from source_gen.clustering import run_case, logit + +logger = logging.getLogger(__name__) -class ProcessorSurveys(Processor): +class ProcessorSurveys(ProcessorBase): def set_component_logger(self): # logger = logging.getLogger('Processor.Surveys') @@ -89,16 +84,16 @@ class ProcessorSurveys(Processor): # def process_pre_job_survey(self, input_args): # '''Returns a boolean as to whether the job is ready for full processing.''' - # self.logger.info('started process_pre_job_survey(), nothing to do') + # logger.info('started process_pre_job_survey(), nothing to do') # # return True def process_in_job_survey(self, jobPath,status,config,component): logit() - self.logger.info('started process_in_job_survey()') + logger.info('started process_in_job_survey()') - self.logger.debug('Performing download(s) from ODK server') + logger.debug('Performing download(s) from ODK server') credentials_filename = config['Survey']['ServerCredentialsFile'] with open(credentials_filename) as credentials_file: @@ -110,7 +105,7 @@ class ProcessorSurveys(Processor): csv_filenames = {} for form in cred['forms']: - self.logger.debug(f"Starting to download {form['form_id']}") + logger.debug(f"Starting to download {form['form_id']}") assert form['type'] in self.GET_FORM_AS_CSV_DICT @@ -197,7 +192,7 @@ class ProcessorSurveys(Processor): standard_columns = dfi.columns.tolist() dfm = dfi - self.logger.debug(f"First processed form contains {dfm.shape[0]} records") + logger.debug(f"First processed form contains {dfm.shape[0]} records") first=False continue @@ -206,7 +201,7 @@ class ProcessorSurveys(Processor): # and fill missing columns with empty strings dfi = dfi.reindex(standard_columns,fill_value='',axis='columns') - self.logger.debug(f"Next processed form contains {dfi.shape[0]} records") + logger.debug(f"Next processed form contains {dfi.shape[0]} records") dfm = concat([dfm,dfi],axis='rows') @@ -216,7 +211,7 @@ class ProcessorSurveys(Processor): forms_fn = f"{Export_csv_path}/Merged_SurveyData.csv" dfm.to_csv(forms_fn,index=False,quoting=csv.QUOTE_MINIMAL) - self.logger.debug(f"Preparing to apply removals and additions to survey data") + logger.debug(f"Preparing to apply removals and additions to survey data") processed_surveys_filepath = f"{Export_csv_path}/Processed_SurveyData.csv" @@ -228,12 +223,12 @@ class ProcessorSurveys(Processor): if dfm['KEY'].unique().size != dfm['KEY'].size: status.reset('WARNING') - self.logger.warning(f"KEY column is not unique, removing duplicates") + logger.warning(f"KEY column is not unique, removing duplicates") # count the number of duplicates n_duplicates = dfm.shape[0] - dfm['KEY'].unique().size # drop the duplicates dfm = dfm.drop_duplicates(keep='first') - self.logger.warning(f"Removed {n_duplicates} duplicates") + logger.warning(f"Removed {n_duplicates} duplicates") df_rm = read_csv(survey_errors_to_remove_filepath,dtype='str') keys_to_rm = df_rm['KEY'] @@ -244,22 +239,22 @@ class ProcessorSurveys(Processor): n_rm_keys = rm_keys_found.size if not np_all(rm_keys_found): # this might happen if the run date is in the past - self.logger.warning(f"Only found {n_rm_keys_found} of {n_rm_keys} survey errors to remove") + logger.warning(f"Only found {n_rm_keys_found} of {n_rm_keys} survey errors to remove") rm_keys_not_found = df_rm[~rm_keys_found] - self.logger.debug(f"Erroneous entries not found are:\n{rm_keys_not_found}") + logger.debug(f"Erroneous entries not found are:\n{rm_keys_not_found}") - self.logger.debug(f"Type of keys that can be found include:\n{dfm['KEY'].dtype}") + logger.debug(f"Type of keys that can be found include:\n{dfm['KEY'].dtype}") dfm_short_keys = [val for val in dfm['KEY'].values if len(str(val)) <10] - self.logger.debug(f"Keys that can be found include:\n{dfm_short_keys}") + logger.debug(f"Keys that can be found include:\n{dfm_short_keys}") # identify which surveys to remove idx_to_rm = dfm['KEY'].apply(lambda cell: cell in keys_to_rm.values) #drop them in-place dfm = dfm[~idx_to_rm] - self.logger.info(f"Removed {n_rm_keys_found} erroneous surveys") + logger.info(f"Removed {n_rm_keys_found} erroneous surveys") # add the extra entries df_add = read_csv(survey_additions_filepath,dtype='str') @@ -267,12 +262,12 @@ class ProcessorSurveys(Processor): df_join = concat([dfm,df_add]) assert dfm.shape[0]+df_add.shape[0] == df_join.shape[0], 'Unexpected result of including additional surveys' - self.logger.info(f"Added {n_add_keys} additional surveys") + logger.info(f"Added {n_add_keys} additional surveys") # save as processed df_join.to_csv(processed_surveys_filepath,index=False,quoting=csv.QUOTE_MINIMAL) - self.logger.debug('Preparing clustering calculation') + logger.debug('Preparing clustering calculation') date = datetime.datetime.now() @@ -284,7 +279,7 @@ class ProcessorSurveys(Processor): if 'Groups' in config['Survey']: # if 'Groups' is defined in the config, create grouped survey files and run python version - self.logger.info('Preparing grouped survey files') + logger.info('Preparing grouped survey files') group_directory = f"{jobPath}/Groups" Path(group_directory).mkdir(parents=True, exist_ok=True) @@ -295,11 +290,11 @@ class ProcessorSurveys(Processor): assert all([group_by in df_join.columns for group_by in config['Survey']['GroupBy']]), 'Column(s) requested in GroupBy are not present in the dataframe' for group_by in config['Survey']['GroupBy']: - self.logger.debug(f"grouping by {group_by}") + logger.debug(f"grouping by {group_by}") # handle NaN values if df_join[group_by].isna().any(): - self.logger.warning(f"Grouping by {group_by} contains NaN values. Filling NaN values with 'Unknown'") + logger.warning(f"Grouping by {group_by} contains NaN values. Filling NaN values with 'Unknown'") df_join[group_by] = df_join[group_by].fillna('Unknown') groups_list = df_join[group_by].unique() @@ -319,17 +314,17 @@ class ProcessorSurveys(Processor): if '+' in group_name: group_name_separated = group_name.split('+') if all([i in groups_to_ignore for i in group_name_separated]): - self.logger.debug(f"Adding group {group_name} to list of groups to ignore") + logger.debug(f"Adding group {group_name} to list of groups to ignore") groups_to_ignore.append(group_name) # TODO: rename group based on the unique element for group_name in groups_to_ignore: if group_name in groups: - self.logger.debug(f"Removing group {group_name} from list of groups") + logger.debug(f"Removing group {group_name} from list of groups") del groups[group_name] for group_name,group_content in groups.items(): - self.logger.info(f"Creating survey group {group_name} which includes {group_content}") + logger.info(f"Creating survey group {group_name} which includes {group_content}") # applying the grouping, keeping only the surveys that have the elements listed in the group df_group = df_join @@ -358,23 +353,23 @@ class ProcessorSurveys(Processor): day_offsets = [-2,9], output_dir = output_directory) - self.logger.debug('Placing copy of result in job directory with conventional name') + logger.debug('Placing copy of result in job directory with conventional name') output_filename = f"sources_{group_name}_{config['StartString']}.csv" output_path = f"{jobPath}/upload/{output_filename}" - self.logger.debug(f"as {output_path}") + logger.debug(f"as {output_path}") copyfile(sources_path, output_path) # THIS CAN BE REMOVED ONCE THE GROUPS ARE PROPERLY PICKED UP BY THE METOFFICE if (group_name == 'PROD'): - self.logger.debug('Additionally placing copy of PROD result in job directory without group name') + logger.debug('Additionally placing copy of PROD result in job directory without group name') output_filename = f"sources_{config['StartString']}.csv" output_path = f"{jobPath}/upload/{output_filename}" - self.logger.debug(f"as {output_path}") + logger.debug(f"as {output_path}") copyfile(sources_path, output_path) else: @@ -397,12 +392,12 @@ class ProcessorSurveys(Processor): day_offsets = [-2,9], output_dir = output_directory) - self.logger.debug('Placing copy of result in job directory with conventional name') + logger.debug('Placing copy of result in job directory with conventional name') output_filename = f"sources_{config['StartString']}.csv" output_path = f"{jobPath}/upload/{output_filename}" - self.logger.debug(f"as {output_path}") + logger.debug(f"as {output_path}") copyfile(sources_path, output_path) @@ -425,7 +420,7 @@ class ProcessorSurveys(Processor): def process_EWS_plotting_survey(self, jobPath,config): '''Returns a list of output files for transfer.''' - self.logger.info('started process_EWS_plotting_survey(), nothing to do') + logger.info('started process_EWS_plotting_survey(), nothing to do') pass return [] diff --git a/coordinator/ProcessorSurveysODK.py b/coordinator/ProcessorSurveysODK.py index 345a76306d0fc1601af56936fb5f287005a5ae17..d81777ca5f782985d8a2f3a5b872a24fa7f3ce8b 100644 --- a/coordinator/ProcessorSurveysODK.py +++ b/coordinator/ProcessorSurveysODK.py @@ -16,8 +16,7 @@ from ProcessorUtils import ( add_filters_to_sublogger, ) -logger = logging.getLogger('Processor.Surveys.ODK') -add_filters_to_sublogger(logger) +logger = logging.getLogger(__name__) def get_ODK_form_as_csv(form_credentials: dict, jobPath: str, config: dict, status): '''Given a dict with a single ODK form to download from an ODK Aggregate diff --git a/coordinator/ProcessorSurveysODKSA.py b/coordinator/ProcessorSurveysODKSA.py index 354c0dec08b36c4f1df6742dc679462b4239cfdc..2d2f9e5803c6496a3a62cd9a20b04d213b789218 100644 --- a/coordinator/ProcessorSurveysODKSA.py +++ b/coordinator/ProcessorSurveysODKSA.py @@ -20,8 +20,7 @@ from ProcessorUtils import ( add_filters_to_sublogger, ) -logger = logging.getLogger('Processor.Surveys.ODK') -add_filters_to_sublogger(logger) +logger = logging.getLogger(__name__) # keys are column names in the input dataframe # values that are None mean they should be dropped diff --git a/coordinator/ProcessorSurveysODKv2.py b/coordinator/ProcessorSurveysODKv2.py index 45f4429e039f5f902eb49c78a82a8bb3cb99221c..fb1ba8ec1ba941b57667b579268a48f18fdbea52 100644 --- a/coordinator/ProcessorSurveysODKv2.py +++ b/coordinator/ProcessorSurveysODKv2.py @@ -20,8 +20,7 @@ from ProcessorUtils import ( add_filters_to_sublogger, ) -logger = logging.getLogger('Processor.Surveys.ODKv2') -add_filters_to_sublogger(logger) +logger = logging.getLogger(__name__) def get_from_file( file_path: str, diff --git a/coordinator/ProcessorSurveysWRSIS.py b/coordinator/ProcessorSurveysWRSIS.py index 8bee755841078d2e34004282b2983db82fb05f56..22667577fb4f7031d88715665a07f14524b8c21c 100644 --- a/coordinator/ProcessorSurveysWRSIS.py +++ b/coordinator/ProcessorSurveysWRSIS.py @@ -11,14 +11,11 @@ import requests from shutil import copyfile from pandas import json_normalize -from ProcessorSurveyUtils import parse_columns -from ProcessorUtils import ( - endJob, - add_filters_to_sublogger, -) - -logger = logging.getLogger('Processor.Surveys.WRSIS') -add_filters_to_sublogger(logger) +from coordinator.ProcessorSurveyUtils import parse_columns +from coordinator.ProcessorUtils import endJob + + +logger = logging.getLogger(__name__) def get_from_WRSIS(form_credentials: dict, startDate: str, endDate: str): '''Make the HTTP request, then checking the status code.''' diff --git a/coordinator/ProcessorSurveysWRT.py b/coordinator/ProcessorSurveysWRT.py index 098a5a6456e126f746a60785166545e06cf3c8f5..984de3b4de9eb2e191d3da0522682761246077e0 100644 --- a/coordinator/ProcessorSurveysWRT.py +++ b/coordinator/ProcessorSurveysWRT.py @@ -11,14 +11,10 @@ import requests from shutil import copyfile from pandas import json_normalize -from ProcessorSurveyUtils import parse_columns -from ProcessorUtils import ( - endJob, - add_filters_to_sublogger, -) - -logger = logging.getLogger('Processor.Surveys.WRT') -add_filters_to_sublogger(logger) +from coordinator.ProcessorSurveyUtils import parse_columns +from coordinator.ProcessorUtils import endJob + +logger = logging.getLogger(__name__) # Downloads of the WRT based on the ODK form reports country descriptions as two letter codes. # The reason is unclear, as viewing online shows full country names. To be diff --git a/coordinator/ProcessorSurveyskobotoolbox.py b/coordinator/ProcessorSurveyskobotoolbox.py index 1d5d96f05d2a3af1c1f5e5167a41f12f23ec8dc4..182e2232d119efad248f005dc64c6df4f1667963 100644 --- a/coordinator/ProcessorSurveyskobotoolbox.py +++ b/coordinator/ProcessorSurveyskobotoolbox.py @@ -17,8 +17,7 @@ from ProcessorUtils import ( add_filters_to_sublogger, ) -logger = logging.getLogger('Processor.Surveys.kobotoolbox') -add_filters_to_sublogger(logger) +logger = logging.getLogger(__name__) def get_from_kobotoolbox(url,form_id,form_token,**kwargs): '''Make the HTTP request, then checking the status code.''' diff --git a/coordinator/ProcessorSurveysnewODK.py b/coordinator/ProcessorSurveysnewODK.py index caa817c68f59cabcdf72022a3d8f556828e688bc..a275a396b520c1a15eb51424e936db235a219304 100644 --- a/coordinator/ProcessorSurveysnewODK.py +++ b/coordinator/ProcessorSurveysnewODK.py @@ -17,8 +17,7 @@ from ProcessorUtils import ( add_filters_to_sublogger, ) -logger = logging.getLogger('Processor.Surveys.kobotoolbox') -add_filters_to_sublogger(logger) +logger = logging.getLogger(__name__) # Cases to convert incident categories to expected short name category or value cases_incident = { diff --git a/coordinator/ProcessorSurveysnewODK2.py b/coordinator/ProcessorSurveysnewODK2.py index 5b40c504cbbf969d1c158e9e51f7dbf18df78b54..afb015125f3bb7e73a76eb4a64845f3057ccd675 100644 --- a/coordinator/ProcessorSurveysnewODK2.py +++ b/coordinator/ProcessorSurveysnewODK2.py @@ -6,26 +6,23 @@ import datetime import logging import os from pathlib import Path -import requests - from shutil import copyfile -from pandas import DataFrame + +import requests from ProcessorSurveyUtils import parse_columns from ProcessorSurveysnewODK import ( - cases_incident, - cases_severity, - get_from_kobotoolbox, - build_dataframe + cases_incident, + cases_severity, + get_from_kobotoolbox, + build_dataframe ) from ProcessorUtils import ( - endJob, - add_filters_to_sublogger, + endJob, ) -logger = logging.getLogger('Processor.Surveys.kobotoolbox') -add_filters_to_sublogger(logger) +logger = logging.getLogger(__name__) # Downloads of the ODK form reports country descriptions as two letter codes. # The reason is unclear, as viewing online shows full country names. To be diff --git a/coordinator/ProcessorUtils.py b/coordinator/ProcessorUtils.py index cbcede7547f4e191073296017de5b0eac81af1f4..17bb499af782ec0c0950cf7a67748f1a113e479c 100644 --- a/coordinator/ProcessorUtils.py +++ b/coordinator/ProcessorUtils.py @@ -5,6 +5,7 @@ import datetime import glob import json import logging +import logging.config import os import re from pathlib import Path @@ -21,6 +22,14 @@ import cf_units from flagdir import jobStatus # created by jws52 +logger = logging.getLogger(__name__) + +loglevels = {'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL} + short_name = { 'Advisory' : 'ADVISORY', 'Deposition' : 'DEPOSITION', @@ -95,9 +104,6 @@ def add_filters_to_sublogger(logger_case): h.addFilter(PasswordODKFilter()) -logger = logging.getLogger('Processor.Utils') -add_filters_to_sublogger(logger) - def open_and_check_config(configFile): logger.info("Loading config") @@ -322,32 +328,6 @@ def query_component_success(config_i,job_run: str, job_to_check: str): return True -# def query_past_successes(input_args: dict, -# component_to_query: str = 'Deposition'): -# '''Checks if deposition and environment jobs are already completed -# successfully. If not, it raises an error.''' -# -# component: str = input_args['component'] -# -# assert component_to_query in ['Deposition','Environment','Epidemiology'] -# -# # check configs can be loaded -# config_fns: List[str] = input_args['config_paths'] -# for configFile in config_fns: -# try: -# config_i = open_and_check_config(configFile) -# except: -# logger.exception(f"Failure in opening or checking config {configFile}") -# endScript(premature=True) -# -# # some config initialisation is necessary -# config_i['StartString'] = input_args['start_date'] -# -# # check if dependent job is readily available -# query_component_success(config_i,component,component_to_query) -# -# return True - def dataframe_to_series(df: pd.DataFrame) -> pd.Series: """Reformate a pandas Dataframe into a pandas Series. @@ -512,4 +492,72 @@ def create_nc_file(s: pd.Series, filename: str, data_attr: dict, compression_typ ncFile.close() logger.info(f"{filename} netCDF4 file created.") - return True \ No newline at end of file + return True + +def setup_logging(job_file_path: str, is_live: bool, log_level: str): + + """ + sets up the Logging configuration for the project by loading the config json file (specified as an environment + variable). + :return: + """ + + assert 'LOGGING_CONFIG' in os.environ + logging_config_env = os.environ['LOGGING_CONFIG'] + assert os.path.exists(logging_config_env) + + # get the email credentials file path from the environment variables + assert 'EMAIL_CRED' in os.environ + email_credential_fn = os.environ['EMAIL_CRED'] + assert os.path.exists(email_credential_fn) + + with open(email_credential_fn, 'r') as f: + gmail_config = json.load(f) + + # check contents + required_keys = ['user', 'pass', 'host', 'port', 'toaddrs'] + for required_key in required_keys: + assert required_key in gmail_config + + arg_string = ' '.join(sys.argv) + + # load logging json config file + json_file_path = logging_config_env + with open(json_file_path, 'r') as f: + log_config_dict = json.load(f) + + # replace the {arg_string} roken in the logging json with the values passed in the command line. This allows the + # command line arguments to be logged. + log_config_dict['formatters']['detailed']['format'] = \ + log_config_dict['formatters']['detailed']['format'].replace("{arg_string}", arg_string) + + log_config_dict['filters']['mask_passwords']['()'] = PasswordODKFilter + + log_config_dict['handlers']['handler_job']['filename'] = job_file_path + + # if there is no value set for the central project logger in the json file, set a default path. + if not log_config_dict['handlers']['handler_project']['filename']: + # todo how does this work when there are several processors running at once, i.o. errors? + log_path_project = os.path.join(os.path.dirname(__file__), "logs", "log.txt") + print(f"'HANDLER_PROJECT' HAS NO 'FILENAME' SET TO SETTING TO {log_path_project}") + log_config_dict['handlers']['handler_project']['filename'] = log_path_project + + if is_live: + # TODO: smtp handler can only use tls, but ssl is more secure. Look into defining/writing a suitable smtp handler + log_config_dict['handlers']['handler_buffered_email']['server'] = (gmail_config['host'], gmail_config['port']) + log_config_dict['handlers']['handler_buffered_email']['credentials'] = (gmail_config['user'], gmail_config['pass']) + log_config_dict['handlers']['handler_buffered_email']['fromaddr'] = gmail_config['user'] + log_config_dict['handlers']['handler_buffered_email']['toaddrs'] = gmail_config['toaddrs'] + else: + # if not live - we dont want to send emails + print(f"is_live = False, so removing email handler from logging config") + log_config_dict['handlers'].pop('handler_buffered_email') + for logger_key in log_config_dict['loggers'].keys(): + log_config_dict['loggers'][logger_key]['handlers'].remove('handler_buffered_email') + + assert log_level in loglevels + for logger_key in log_config_dict['loggers'].keys(): + log_config_dict['loggers'][logger_key]['level'] = loglevels.get(log_level) + + logging.config.dictConfig(log_config_dict) + logger.info("Logging setup complete") diff --git a/coordinator/Processor.py b/coordinator/processor_base.py similarity index 64% rename from coordinator/Processor.py rename to coordinator/processor_base.py index 25f379920b0012650e7e01361749de6c7326fd96..98051fbc9d25ea772d576099dda0be5051c4c420 100755 --- a/coordinator/Processor.py +++ b/coordinator/processor_base.py @@ -19,6 +19,8 @@ or:: from abc import abstractmethod, ABCMeta from typing import List, Union, Any, Dict +from coordinator import ProcessorUtils + print("Make sure to `conda activate py3EWSepi` environment!") print("Make sure that flagdir package is available (on PYTHONPATH)") @@ -27,26 +29,21 @@ import argparse import datetime import json import logging -import logging.config import os from pathlib import Path -import shutil import sys # gitlab projects from flagdir import jobStatus # created by jws52 # submodules of this project -import BufferingSMTPHandler -import ProcessorComponents from ProcessorUtils import ( append_item_to_list, clear_up, endScript, endJob, open_and_check_config, - PasswordODKFilter, - short_name + short_name, ) """ @@ -55,98 +52,15 @@ Default logger - will be overridden by the Processor.setup_logging() method when logger = logging.getLogger(__name__) -class Processor: +class ProcessorBase: __metaclass__ = ABCMeta - ### CLASS LEVEL VARIABLES - WILL BE SHARED BY ALL INSTANCES OF THIS CLASS - log_path_default = None - loglevels = None - #### - - def __init__(self) -> None: super().__init__() time_now = datetime.datetime.today() - # dateToday = timeNow.date() self.todayString = time_now.strftime('%Y%m%d') self.nowString = time_now.strftime('%Y%m%d-%H%M-%S') - # self.setup() - - - def setup_logging(self, job_log_file_path: str, is_live: bool): - # initialise default values for configuration - - # get the path to this script - script_path = os.path.dirname(__file__) + '/' - - coordinator_path = script_path - - # log file for all jobs - ##todo how does this work when there are several processors running at once, i.o. errors? - log_path_project = f"{coordinator_path}logs/log.txt" - - # job-specific log file will be written here until a job directory exits, when it will be moved there - # self.log_path_default = f"{coordinator_path}logs/log_{self.nowString}.txt" - - # get the email credentials file path from the environment variables - assert 'EMAIL_CRED' in os.environ - email_credential_fn = os.environ['EMAIL_CRED'] - assert os.path.exists(email_credential_fn) - - with open(email_credential_fn, 'r') as f: - gmail_config = json.load(f) - - # check contents - required_keys = ['user', 'pass', 'host', 'port', 'toaddrs'] - for required_key in required_keys: - assert required_key in gmail_config - - arg_string = ' '.join(sys.argv) - - # load json file - json_file_path = "/home/lb584/git/ews-coordinator/configs/logger/log_config.json" - with open(json_file_path, 'r') as f: - log_config_dict = json.load(f) - # - log_config_dict['formatters']['detailed']['format'] = log_config_dict['formatters']['detailed']['format'].replace("{arg_string}", arg_string) - - log_config_dict['filters']['mask_passwords']['()'] = PasswordODKFilter - - log_config_dict['handlers']['handler_rot_file']['filename'] = log_path_project - - # TODO: smtp handler can only use tls, but ssl is more secure. Look into defining/writing a suitable smtp handler - log_config_dict['handlers']['handler_buffered_email']['server'] = (gmail_config['host'], gmail_config['port']) - log_config_dict['handlers']['handler_buffered_email']['credentials'] = (gmail_config['user'], gmail_config['pass']) - log_config_dict['handlers']['handler_buffered_email']['fromaddr'] = gmail_config['user'] - log_config_dict['handlers']['handler_buffered_email']['toaddrs'] = gmail_config['toaddrs'] - - logging.config.dictConfig(log_config_dict) - - self.loglevels = {'debug': logging.DEBUG, - 'info': logging.INFO, - 'warning': logging.WARNING, - 'error': logging.ERROR, - 'critical': logging.CRITICAL, - } - - def set_job_file_logger(self, job_log_file_path: str): - job_file_handler = logging.FileHandler(job_log_file_path, mode = 'a') - job_file_handler.setLevel(logging.INFO) - job_file_handler.setFormatter(logging.Formatter('"%(name)s : %(levelname)s - %(message)s"')) - job_file_handler.addFilter(PasswordODKFilter()) - job_file_handler.set_name('handler_file') - logger.addHandler(job_file_handler) - # todo we can get rid of this once we make all ews packages share a common namespace - logging.getLogger("root").addHandler(job_file_handler) - - - def set_is_logging_live(self, is_live: bool): - if not is_live: - # remove the log handler that would send emails - logger.handlers = [h for h in logger.handlers if - not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] - def parse_and_check_args(self) -> dict: @@ -250,15 +164,6 @@ class Processor: return dictionary - def set_log_level(self, log_level: str): - new_log_level_code = self.loglevels[log_level] - - # modify log level of all loggers - logger.info(f"logging level being changed to {new_log_level_code} ({log_level}) because of command-line option") - loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] - for logger_i in loggers: logger_i.setLevel(new_log_level_code) - - def build_universal_config(self, configs: list, component: str, universal_config = None) -> dict: '''This config obtains aspects of each dict in configs that must be common to them all. ''' @@ -267,9 +172,6 @@ class Processor: if not universal_config: universal_config = { 'WorkspacePathout': set(), - # 'ProcessPreJob': set(), - # 'ProcessInJob': set(), - # 'ProcessEWSPlotting': set(), 'ServerPath': set(), 'ServerName': set(), 'ServerKey': set()} @@ -334,7 +236,6 @@ class Processor: config_paths: List[str] = args['config_paths'] component: str = args['component'] start_date: str = args['start_date'] - noupload: bool = args['noupload'] clearup: bool = args['clearup'] # load universal configuration @@ -346,14 +247,13 @@ class Processor: job_path: str = f'{workspacePath}{short_name[component]}_{start_date}' self.prepare_job_directory(job_path) - log_file_path = f"{job_path}/log.txt" is_live: bool = args["live"] - logger.info(f"this is before the path is set {log_file_path}") - self.setup_logging(log_file_path, is_live) - self.set_job_file_logger(log_file_path) - self.set_is_logging_live(is_live) + log_file_path = f"{job_path}/log.txt" - self.set_log_level(args['log_level']) + """ + Now we have the job_dir defined, we can set up the logging + """ + ProcessorUtils.setup_logging(log_file_path, is_live, args['log_level']) logger.info("==========") logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") @@ -361,13 +261,6 @@ class Processor: logger.info(f"Job path will be {job_path}") workspacePath = universal_config['WorkspacePathout'] - # process_pre_job = getattr(ProcessorComponents, universal_config['ProcessPreJob']) - # - # process_in_job = getattr(ProcessorComponents, universal_config['ProcessInJob']) - # - - # process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting']) - # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC start_time: datetime = datetime.datetime.strptime(start_date + '03', '%Y%m%d%H') start_time_string: str = start_time.strftime('%Y-%m-%d-%H%M') @@ -375,15 +268,11 @@ class Processor: # run any checks before creating a job directory # if this fails, then make a note once there is a job directory ready = self.process_pre_job(args) - # - # if not ready: - # logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") # lock job directory status: jobStatus with jobStatus(job_path) as status: - # lawrence comment in/out # check for a status file in job directory if status.had_initial_status: logger.info(f"Job path already exists and has status {status.status}") @@ -392,8 +281,6 @@ class Processor: logger.info(f"Current status of job directory is {status.status}") - # self.move_default_logfile_handler(dstPathName = logPathJob) - # make a record if process_pre_job failed if not ready: logger.error( @@ -402,9 +289,6 @@ class Processor: endJob(status, ignore_inprogress = True, premature = False) - # files and directories that will be uploaded to public server - # files_to_send = [] - # files and directories that will be earmarked for removal after a # successful job paths_to_clear = [] @@ -454,13 +338,6 @@ class Processor: 'output': None, 'clearup': None} - # if 'output' in proc_out.keys(): - # append_item_to_list( - # proc_out['output'], - # files_to_send, - # proc_description, - # status) - if 'clearup' in proc_out.keys(): append_item_to_list( proc_out['clearup'], @@ -470,10 +347,9 @@ class Processor: # Run EWS-plotting command - # proc_description = universal_config['ProcessEWSPlotting'] proc_description = 'ProcessEWSPlotting' try: - ews_plotting_outputs = self.process_post_job(job_path, configjson) + self.process_post_job(job_path, configjson) except: logger.exception(f"Error in {proc_description}()") status.reset('ERROR') @@ -481,53 +357,8 @@ class Processor: logger.info('Finished with EWS-Plotting, appending images to list for transfer') - # if ews_plotting_outputs: - # append_item_to_list( - # ews_plotting_outputs, - # files_to_send, - # proc_description, - # status) - logger.info(f'Finished with config {configIndex + 1} of {config_paths_length}') - # # send results to remote server - # - # if not noupload: - # try: - # ProcessorComponents.upload(universal_config, files_to_send, component) - # except IndexError: - # status.reset('WARNING') - # - # except: - # logger.exception('Failed to upload files to remote server') - # status.reset('ERROR') - # endJob(status, premature = True) - # - # # check if there is a second location on willow to provide results - # if 'ServerPathExtra' in configjson[component]: - # - # logger.info('There is an extra path to send results to:') - # - # extra_path = configjson[component]['ServerPathExtra'] - # - # logger.info(extra_path) - # - # universal_config_extra = universal_config.copy() - # universal_config_extra['ServerPath'] = extra_path - # - # try: - # ProcessorComponents.upload(universal_config_extra, files_to_send, component) - # except IndexError: - # status.reset('WARNING') - # - # except: - # logger.exception('Failed to upload files to extra directory on remote server') - # status.reset('ERROR') - # endJob(status, premature = True) - # - # else: - # logger.info('Because noupload argument was present, not sending results to remote server') - status.reset('SUCCESS') if status.is_success() & (clearup is True): @@ -563,8 +394,8 @@ class Processor: raise NotImplementedError def run_processor(self, component: str): - print("Make sure to `conda activate py3EWSepi` environment!") - print("Make sure that flagdir package is available (on PYTHONPATH)") + logger.info("Make sure to `conda activate py3EWSepi` environment!") + logger.info("Make sure that flagdir package is available (on PYTHONPATH)") try: args: dict = self.parse_and_check_args() args["component"] = component @@ -573,8 +404,7 @@ class Processor: except SystemExit as e: print("caught with code " + str(e.code)) - if logger: - logger.info('run_process() exited') + logger.info('run_process() exited') sys.exit(e.code) except: raise Exception('Uncaught exception in run_Process:') diff --git a/scripts/run_utils.sh b/scripts/run_utils.sh index d390026667db11e696be35cf0c748398135f847a..f1d875e43ff8a1776f8be101d5d64e717fdc651f 100755 --- a/scripts/run_utils.sh +++ b/scripts/run_utils.sh @@ -8,7 +8,7 @@ function setup_conda_env() { bin=/storage/app/EWS_prod/code/ # provide custom python packages so they can be imported - coordinator=${bin}/coordinator/coordinator/ + coordinator=${bin}/coordinator/ flagdir=${bin}/flagdir/ epimodel=${bin}/epimodel/ advisory=${bin}/advisory_builder/ @@ -22,6 +22,7 @@ function setup_conda_env() { # provide path to email credentials for logging + export LOGGING_CONFIG=${bin}/coordinator/configs/logger/log_config.json export EMAIL_CRED=${envs}/credentials/Cred_gmail.json # activate venv environment of python modules so they can be imported