From 40eb20b2985b2d0a5bcf64f5407c7fdc3500b802 Mon Sep 17 00:00:00 2001
From: lb584 <lb584@cam.ac.uk>
Date: Fri, 5 Apr 2024 14:57:00 +0100
Subject: [PATCH] initial commit to see if the tests work on the server

---
 coordinator/Processor.py                      | 138 +++++++++++-------
 coordinator/ProcessorAdvisory.py              |  43 +-----
 coordinator/ProcessorComponents.py            |  10 +-
 coordinator/ProcessorDeposition.py            |   7 +-
 coordinator/ProcessorEnvironment.py           |   8 +-
 coordinator/ProcessorEpidemiology.py          |   7 +-
 coordinator/ProcessorScraper.py               |   9 +-
 coordinator/ProcessorSurveys.py               |  19 ++-
 .../partial/integration_test_utils.py         |  18 ++-
 9 files changed, 144 insertions(+), 115 deletions(-)

diff --git a/coordinator/Processor.py b/coordinator/Processor.py
index 3ce7217..ffcf3f2 100755
--- a/coordinator/Processor.py
+++ b/coordinator/Processor.py
@@ -17,7 +17,7 @@ or::
     $ ./run_Processor.sh -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715
 '''
 from abc import abstractmethod, ABCMeta
-from typing import List, Union, Any
+from typing import List, Union, Any, Dict
 
 
 print("Make sure to `conda activate py3EWSepi` environment!")
@@ -62,29 +62,33 @@ class Processor:
 
     def __init__(self) -> None:
         super().__init__()
-        self.setup()
+        time_now = datetime.datetime.today()
+        # dateToday = timeNow.date()
+        self.todayString = time_now.strftime('%Y%m%d')
+        self.nowString = time_now.strftime('%Y%m%d-%H%M-%S')
+        # self.setup()
 
 
-    def setup(self):
+    def setup_logging(self, job_log_file_path: str, is_live: bool):
         # initialise default values for configuration
 
         script_name = 'Processor'
 
-        timeNow = datetime.datetime.today()
-        dateToday = timeNow.date()
-        self.todayString = timeNow.strftime('%Y%m%d')
-        self.nowString = timeNow.strftime('%Y%m%d-%H%M-%S')
-
         # get the path to this script
         script_path = os.path.dirname(__file__) + '/'
 
         coordinator_path = script_path
 
         # log file for all jobs
+        ##todo how does this work when there are several processors running at once, i.o. errors?
         log_path_project = f"{coordinator_path}logs/log.txt"
 
+        # make the job_log_file_path parent dirs if it does not exist
+        job_log_file_parent_dirs = os.path.dirname(job_log_file_path)
+        Path(job_log_file_parent_dirs).mkdir(parents=True, exist_ok=True)
+
         # job-specific log file will be written here until a job directory exits, when it will be moved there
-        self.log_path_default = f"{coordinator_path}logs/log_{self.nowString}.txt"
+        # self.log_path_default = f"{coordinator_path}logs/log_{self.nowString}.txt"
 
         # get the email credentials file path from the environment variables
         assert 'EMAIL_CRED' in os.environ
@@ -149,7 +153,7 @@ class Processor:
                     'level':     'INFO',
                     'formatter': 'simple',
                     'filters':   ['mask_passwords'],
-                    'filename':  self.log_path_default,
+                    'filename':  job_log_file_path,
                     'mode':      'a',  # 'a' for append
                 },
                 # to email errors to maintainers
@@ -192,6 +196,10 @@ class Processor:
         # create a logger named according to how the file is called
         # logger = logging.getLogger(__name__)
         self.logger = logging.getLogger(script_name)
+        if not is_live:
+            # remove the log handler that would send emails
+            self.logger.handlers = [h for h in self.logger.handlers if
+                                    not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)]
 
         self.loglevels = {'debug':    logging.DEBUG,
                           'info':     logging.INFO,
@@ -281,7 +289,7 @@ class Processor:
         my_parser.add_argument(
                 '-l', '--loglevel',
                 action = 'store',
-                choices = list(self.loglevels.keys()),
+                choices = list(["debug", "info", "warning", "error", "critical"]),
                 default = 'info',
                 help = 'verbosity of log messaging (debug, info, warning, error, critical)\n default is debug',
                 dest = 'log_level',  # this names the attribute that will be parsed
@@ -321,10 +329,10 @@ class Processor:
 
         # Check the args
 
-        self.logger.info(f"Command-line options are:\n{args}")
+        print(f"Command-line options are:\n{args}")
 
         if not isinstance(args.config_paths, list):
-            self.logger.error('Expecting a list of config paths')
+            print('Expecting a list of config paths')
             raise RuntimeError
 
         # check the startstring
@@ -340,24 +348,23 @@ class Processor:
                 assert provided_start_date <= today_date
 
             except (ValueError, AssertionError) as e:
-                self.logger.exception(
+                raise Exception(
                     "Provided start date string is formatted incorrectly or out of range, or end date not also defined")
-                raise
 
         dictionary: dict = vars(args)
         return dictionary
 
 
     def set_log_level(self, log_level: str):
-        new_log_level = self.loglevels[log_level]
+        new_log_level_code = self.loglevels[log_level]
 
         # modify log level of all self.loggers
-        self.logger.info(f"logging level being changed to {new_log_level} because of command-line option")
+        self.logger.info(f"logging level being changed to {new_log_level_code} ({log_level}) because of command-line option")
         loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
-        for logger_i in loggers: logger_i.setLevel(new_log_level)
+        for logger_i in loggers: logger_i.setLevel(new_log_level_code)
 
 
-    def build_universal_config(self, configs: list, component: str, universal_config = None):
+    def build_universal_config(self, configs: list, component: str, universal_config = None) -> dict:
         '''This config obtains aspects of each dict in configs that must be common to
         them all. '''
 
@@ -407,43 +414,65 @@ class Processor:
 
         return universal_config
 
+    def prepare_job_directory(self, job_path):
+        """
+        create job directory or archive if already exists (due to a rerun)
+
+        :param job_path:
+        :return:
+        """
+        if os.path.exists(job_path):
+            archived_dir_path = f"{job_path}_{self.nowString}"
+            message = f"Job path {job_path} already exists, so moving it to {archived_dir_path}"
+            print(message)
+            os.rename(job_path, archived_dir_path)
+
+        Path(job_path).mkdir(parents = True, exist_ok = False)
+
 
-    def run_process(self, args: dict):
+    def run_process(self,
+                    args: dict) -> None:
         # check initial state of each config file, and gather terms that must apply
         # across all provided configs
 
-        if not args["live"]:
-            # remove the log handler that would send emails
-            self.logger.handlers = [h for h in self.logger.handlers if
-                                    not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)]
-
         config_paths: List[str] = args['config_paths']
         component: str = args['component']
         start_date: str = args['start_date']
         noupload: bool = args['noupload']
         clearup: bool = args['clearup']
 
+        # load universal configuration
         universal_config = self.build_universal_config(config_paths, component)
-
         universal_config['StartString'] = start_date
 
-        self.logger.info(f"Universal config is\n{json.dumps(universal_config, indent = 2)}")
+        # determine job directory
+        workspacePath = universal_config['WorkspacePathout']
+        job_path: str = f'{workspacePath}{short_name[component]}_{start_date}'
+        self.prepare_job_directory(job_path)
+
+        log_file_path = f"{job_path}/log.txt"
+        is_live: bool = args["live"]
+        self.setup_logging(log_file_path, is_live)
 
+        self.set_log_level(args['log_level'])
+        self.set_component_logger()
+
+        self.logger.info("==========")
+        self.logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}")
+        self.logger.info(f"Universal config is\n{json.dumps(universal_config, indent = 2)}")
+        self.logger.info(f"Job path will be {job_path}")
         workspacePath = universal_config['WorkspacePathout']
 
         # process_pre_job = getattr(ProcessorComponents, universal_config['ProcessPreJob'])
         #
         # process_in_job = getattr(ProcessorComponents, universal_config['ProcessInJob'])
         #
-        # process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting'])
-
-        # determine job directory
-        jobPath: str = f'{workspacePath}{short_name[component]}_{start_date}'
 
-        self.logger.info(f"Job path will be {jobPath}")
+        # process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting'])
 
         # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC
-        startTime = datetime.datetime.strptime(start_date + '03', '%Y%m%d%H')
+        start_time: datetime = datetime.datetime.strptime(start_date + '03', '%Y%m%d%H')
+        start_time_string: str = start_time.strftime('%Y-%m-%d-%H%M')
 
         # run any checks before creating a job directory
         # if this fails, then make a note once there is a job directory
@@ -452,12 +481,9 @@ class Processor:
         # if not ready:
         #     self.logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it")
 
-        # create job directory
-        Path(jobPath).mkdir(parents = True, exist_ok = True)
-
         # lock job directory
         status: jobStatus
-        with jobStatus(jobPath) as status:
+        with jobStatus(job_path) as status:
 
             # lawrence comment in/out
             # check for a status file in job directory
@@ -468,10 +494,7 @@ class Processor:
 
             self.logger.info(f"Current status of job directory is {status.status}")
 
-            # now that we have a useable job directory, move the log file there
-            logPathJob = f"{jobPath}/log.txt"
-
-            self.move_default_logfile_handler(dstPathName = logPathJob)
+            # self.move_default_logfile_handler(dstPathName = logPathJob)
 
             # make a record if process_pre_job failed
             if not ready:
@@ -504,7 +527,7 @@ class Processor:
 
                 # provide specific case details to template config
 
-                configjson['StartTime'] = startTime.strftime('%Y-%m-%d-%H%M')
+                configjson['StartTime'] = start_time_string
                 configjson['StartString'] = start_date
 
                 # from configtemplate create configFileName to describe the specific job
@@ -514,13 +537,13 @@ class Processor:
                 configjson['ConfigFilePath'] = configFileName
 
                 # write the complete configuration file to job directory
-                with open(f"{jobPath}/{configFileName}.json", 'w') as write_file:
+                with open(f"{job_path}/{configFileName}.json", 'w') as write_file:
                     json.dump(configjson, write_file, indent = 4)
 
                 # proc_description = universal_config['ProcessInJob']
                 proc_description = 'ProcessInJob'
                 try:
-                    proc_out = self.process_in_job(jobPath, status, configjson, component)
+                    proc_out = self.process_in_job(job_path, status, configjson, component)
                 except:
                     self.logger.exception(f"Error in process_in_job")
                     status.reset('ERROR')
@@ -552,7 +575,7 @@ class Processor:
                 # proc_description = universal_config['ProcessEWSPlotting']
                 proc_description = 'ProcessEWSPlotting'
                 try:
-                    EWSPlottingOutputs = self.process_post_job(jobPath, configjson)
+                    EWSPlottingOutputs = self.process_post_job(job_path, configjson)
                 except:
                     self.logger.exception(f"Error in {proc_description}()")
                     status.reset('ERROR')
@@ -633,22 +656,27 @@ class Processor:
     def process_post_job(self, jobPath, configjson):
         raise NotImplementedError
 
+    @abstractmethod
+    def set_component_logger(self):
+        """
+        overridden in sub classes to set component-specific loggers
+        :return:
+        """
+        raise NotImplementedError
 
     def run_processor(self, component: str):
         print("Make sure to `conda activate py3EWSepi` environment!")
         print("Make sure that flagdir package is available (on PYTHONPATH)")
         try:
-            self.logger.info("==========")
-            self.logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}")
-            # load configurations
-            args_dict: dict = self.parse_and_check_args()
-            args_dict["component"] = component
-            self.set_log_level(args_dict['log_level'])
-            self.run_process(args_dict)
+            args: dict = self.parse_and_check_args()
+            args["component"] = component
+
+            self.run_process(args)
+
         except SystemExit as e:
             print("caught with code " + str(e.code))
-            self.logger.info('run_process() exited')
+            if self.logger:
+                self.logger.info('run_process() exited')
             sys.exit(e.code)
         except:
-            self.logger.exception('Uncaught exception in run_Process:')
-            sys.exit(1)
+            raise Exception('Uncaught exception in run_Process:')
diff --git a/coordinator/ProcessorAdvisory.py b/coordinator/ProcessorAdvisory.py
index 582bd9e..75f033d 100644
--- a/coordinator/ProcessorAdvisory.py
+++ b/coordinator/ProcessorAdvisory.py
@@ -19,8 +19,14 @@ from ProcessorUtils import (
 
 class ProcessorAdvisory(Processor):
 
+    def set_component_logger(self):
+        logger = logging.getLogger('Processor.Advisory')
+        add_filters_to_sublogger(logger)
+
+
     def process_pre_job(self, args) -> bool:
-        return self.process_pre_job_advisory(args)
+        # return self.process_pre_job_advisory(args)
+        return True
 
 
     def process_in_job(self, jobPath, status, configjson, component) -> object:
@@ -33,40 +39,7 @@ class ProcessorAdvisory(Processor):
 
     def __init__(self) -> None:
         super().__init__()
-        logger = logging.getLogger('Processor.Advisory')
-        add_filters_to_sublogger(logger)
-
-    def process_pre_job_advisory(self,input_args: dict) -> bool:
 
-        # self.logger.info('started process_pre_job_advisory()')
-        #
-        # # check configs can be loaded
-        # config_fns: List[str] = input_args['config_paths']
-        # for configFile in config_fns:
-        #     try:
-        #         config_i = open_and_check_config(configFile)
-        #     except:
-        #         self.logger.exception(f"Failure in opening or checking config {configFile}")
-        #         endScript(premature=True)
-
-            # check pre-requisite jobs are complete
-
-            # Which jobs to check are defined by the config. It may include epi
-            # and surveys.
-
-            # Note that yesterday's surveys should also have succeeded, but that
-            # needs a YesterdayDateString available for the survey part of
-            # advisory config to point to the status file.
-
-            # dependent_components = config_i['Advisory'].get(
-            #         'DependentComponents',
-            #         ['Deposition','Environment'])
-
-            # for dependent_component in dependent_components:
-            #
-            #     query_past_successes(input_args, dependent_component)
-
-        return True
 
     def process_in_job_advisory(self, jobPath, status, config, component):
         '''Generates a word processor file containing some basic survey statistics
@@ -106,4 +79,4 @@ class ProcessorAdvisory(Processor):
 
 if __name__ == '__main__':
     processor = ProcessorAdvisory()
-    processor.run_processor("Advisory")
\ No newline at end of file
+    processor.run_processor("Advisory")
diff --git a/coordinator/ProcessorComponents.py b/coordinator/ProcessorComponents.py
index 53a132e..f2e8391 100644
--- a/coordinator/ProcessorComponents.py
+++ b/coordinator/ProcessorComponents.py
@@ -12,14 +12,14 @@ from typing import List
 # coordinator stages: pre, in (during) and plotting. 
 
 
-from ProcessorServer import (
-        process_pre_job_server_download,
-        upload
-)
+# from ProcessorServer import (
+#         process_pre_job_server_download,
+#         upload
+# )
 
 from ProcessorUtils import (
         add_filters_to_sublogger,
-        query_past_successes
+        # query_past_successes
 )
 
 # TODO: Replace subprocess scp and ssh commands with paramiko.SSHClient() instance
diff --git a/coordinator/ProcessorDeposition.py b/coordinator/ProcessorDeposition.py
index 63adc08..e84d5fc 100644
--- a/coordinator/ProcessorDeposition.py
+++ b/coordinator/ProcessorDeposition.py
@@ -38,10 +38,14 @@ class ProcessorDeposition(Processor):
 
     def __init__(self) -> None:
         super().__init__()
+
+    """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """
+
+
+    def set_component_logger(self):
         logger = logging.getLogger('Processor.Deposition')
         add_filters_to_sublogger(logger)
 
-    """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """
 
     def process_in_job_dep(self, jobPath, status, config, component):
         self.logger.info('started process_in_job_dep()')
@@ -185,3 +189,4 @@ class ProcessorDeposition(Processor):
 if __name__ == '__main__':
     processor = ProcessorDeposition()
     processor.run_processor("Deposition")
+
diff --git a/coordinator/ProcessorEnvironment.py b/coordinator/ProcessorEnvironment.py
index a2a34c6..c927e41 100644
--- a/coordinator/ProcessorEnvironment.py
+++ b/coordinator/ProcessorEnvironment.py
@@ -22,7 +22,11 @@ from ProcessorUtils import (
 
 
 class ProcessorEnvironment(Processor):
-    
+
+    def set_component_logger(self):
+        logger = logging.getLogger('Processor.Environment')
+        add_filters_to_sublogger(logger)
+
     def process_pre_job(self, args):
         # return process_pre_job_server_download(args)
         return True
@@ -37,8 +41,6 @@ class ProcessorEnvironment(Processor):
 
     def __init__(self) -> None:
         super().__init__()
-        logger = logging.getLogger('Processor.Environment')
-        add_filters_to_sublogger(logger)
 
     def process_in_job_env2_0(self, jobPath,status,config,component):
         '''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.'''
diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py
index 2358409..b26a20d 100644
--- a/coordinator/ProcessorEpidemiology.py
+++ b/coordinator/ProcessorEpidemiology.py
@@ -36,6 +36,11 @@ from ProcessorUtils import (
 )
 
 class ProcessorEpidemiology(Processor):
+
+    def set_component_logger(self):
+        logger = logging.getLogger('Processor.Epi')
+        add_filters_to_sublogger(logger)
+
     def process_pre_job(self, args) -> bool:
         return self.process_pre_job_epi(args)
 
@@ -50,8 +55,6 @@ class ProcessorEpidemiology(Processor):
 
     def __init__(self) -> None:
         super().__init__()
-        logger = logging.getLogger('Processor.Epi')
-        add_filters_to_sublogger(logger)
 
     def process_pre_job_epi(self, input_args: dict):
         '''Returns a boolean as to whether the job is ready for full processing.'''
diff --git a/coordinator/ProcessorScraper.py b/coordinator/ProcessorScraper.py
index 3d98905..1b2d933 100644
--- a/coordinator/ProcessorScraper.py
+++ b/coordinator/ProcessorScraper.py
@@ -34,6 +34,11 @@ class ProcessorScraper(Processor):
 
     """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """
 
+
+    def set_component_logger(self):
+        logger = logging.getLogger('Processor.Scraper')
+        add_filters_to_sublogger(logger)
+
     def process_pre_job(self, args):
         return True
 
@@ -45,11 +50,9 @@ class ProcessorScraper(Processor):
 
     def __init__(self) -> None:
         super().__init__()
-        logger = logging.getLogger('Processor.Scraper')
-        add_filters_to_sublogger(logger)
 
-    """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """
 
+    """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """
 
     # date format conforms to format used in SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv
     # # ODK v1.11.2:
diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py
index a215b99..2bfffc6 100644
--- a/coordinator/ProcessorSurveys.py
+++ b/coordinator/ProcessorSurveys.py
@@ -55,8 +55,13 @@ from ProcessorUtils import (
 
 class ProcessorSurveys(Processor):
 
+    def set_component_logger(self):
+        logger = logging.getLogger('Processor.Surveys')
+        add_filters_to_sublogger(logger)
+
     def process_pre_job(self, args):
-        return self.process_pre_job_survey(args)
+        # return self.process_pre_job_survey(args)
+        return True
 
 
     def process_in_job(self, jobPath, status, configjson, component) -> object:
@@ -69,8 +74,6 @@ class ProcessorSurveys(Processor):
 
     def __init__(self) -> None:
         super().__init__()
-        logger = logging.getLogger('Processor.Surveys')
-        add_filters_to_sublogger(logger)
 
         self.GET_FORM_AS_CSV_DICT = {
             'ODK':         get_ODK_form_as_csv,
@@ -83,11 +86,11 @@ class ProcessorSurveys(Processor):
             'newODK2' :    get_newODK2_form_as_csv,
         }
 
-    def process_pre_job_survey(self, input_args):
-        '''Returns a boolean as to whether the job is ready for full processing.'''
-        self.logger.info('started process_pre_job_survey(), nothing to do')
-
-        return True
+    # def process_pre_job_survey(self, input_args):
+    #     '''Returns a boolean as to whether the job is ready for full processing.'''
+    #     self.logger.info('started process_pre_job_survey(), nothing to do')
+    #
+    #     return True
 
     def process_in_job_survey(self, jobPath,status,config,component):
         self.logger.info('started process_in_job_survey()')
diff --git a/tests/integration/partial/integration_test_utils.py b/tests/integration/partial/integration_test_utils.py
index c52a98a..0bb1966 100644
--- a/tests/integration/partial/integration_test_utils.py
+++ b/tests/integration/partial/integration_test_utils.py
@@ -10,6 +10,7 @@ from zipfile import ZipFile
 
 from HTMLTestRunner import HTMLTestRunner
 
+import ProcessorUtils
 from Processor import Processor
 
 
@@ -189,20 +190,31 @@ class IntegrationTestUtils:
 
         args_dict: dict = {}
 
+        config_paths = [IntegrationTestUtils.TEMP_CONFIG_FILE_NAME]
+
         # note, possible to override these values in the kwargs loop below
         args_dict['live'] = False
         args_dict['noupload'] = True
         args_dict['start_date'] = start_date
         args_dict['component'] = component
-        args_dict['config_paths'] = [IntegrationTestUtils.TEMP_CONFIG_FILE_NAME]
+        args_dict['config_paths'] = config_paths
         args_dict['log_level'] = 'info'
         args_dict['clearup'] = True
 
         for key, value in kwargs.items():
             args_dict[key] = value
 
-        log_level = args_dict['log_level']
-        processor.set_log_level(log_level)
+        # universal_config: dict = processor.build_universal_config(config_paths, component)
+        # workspacePath = universal_config['WorkspacePathout']
+        #
+        # job_path: str = f'{workspacePath}{ProcessorUtils.short_name[component]}_{start_date}'
+        # logPath = f"{job_path}/log.txt"
+        #
+        # is_live: bool = args_dict['live']
+        # processor.setup_logging(logPath, is_live)
+        #
+        # log_level = args_dict['log_level']
+        # processor.set_log_level(log_level)
 
         try:
             processor.run_process(args_dict)
-- 
GitLab