From 3c6f5d175d78f2b126c4c1fae735aa5464f95b33 Mon Sep 17 00:00:00 2001
From: lb584 <lb584@cam.ac.uk>
Date: Mon, 2 Sep 2024 16:36:23 +0100
Subject: [PATCH] extracting functions to allow the ews job dir and the config
 dict to be created without needing to run the pipeline - allows epi fitting
 to retrieve these values if the peipeline has already run

---
 ews/coordinator/processor_base.py             | 174 +++++++++++-------
 ews/coordinator/processor_epidemiology.py     |  29 ++-
 ews/coordinator/utils/processor_utils.py      |   2 +-
 .../partial/integration_test_utils.py         |  13 +-
 4 files changed, 132 insertions(+), 86 deletions(-)

diff --git a/ews/coordinator/processor_base.py b/ews/coordinator/processor_base.py
index fb905c6..241e617 100755
--- a/ews/coordinator/processor_base.py
+++ b/ews/coordinator/processor_base.py
@@ -45,7 +45,6 @@ class ProcessorBase:
 
     def __init__(self) -> None:
         super().__init__()
-        self.config: dict = {}  # this is the config for the current job, gets set in run_process()
         time_now = datetime.datetime.today()
         self.today_string = time_now.strftime('%Y%m%d')
         self.now_string = time_now.strftime('%Y%m%d-%H%M-%S')
@@ -141,6 +140,22 @@ class ProcessorBase:
         dictionary: dict = vars(args)
         return dictionary
 
+    @staticmethod
+    def generate_job_directory_path(config: dict) -> str:
+        """
+        Generate the job directory path
+
+        :param config:
+        :return:
+        """
+        short_name: str = config['short_name']
+        start_date: str = config['StartString']
+        workspace_path = config['WorkspacePathout']
+
+        job_path: str = f'{workspace_path}{short_name}_{start_date}'
+
+        return job_path
+
     def prepare_job_directory(self, job_path: str):
         """
         create job directory or archive if already exists (due to a rerun)
@@ -158,57 +173,36 @@ class ProcessorBase:
 
 
     def run_process(self,
-                    args: dict) -> None:
-
-        # check initial state of each config file, and gather terms that must apply
-        # across all provided configs
-
-        sys_config_path: str = args['sys_config_path']
-        config_path: str = args['config_path']
-        component: str = args['component']
-        short_name: str = args['short_name']
-        start_date: str = args['start_date']
-        clearup: bool = args['clearup']
-        is_live: bool = args["live"]
-
-        # load universal configuration
-        sys_config = parse_json_file_with_tokens(sys_config_path)
-
-        # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC
-        start_time: datetime = datetime.datetime.strptime(start_date + '03', '%Y%m%d%H')
-        start_time_string: str = start_time.strftime('%Y-%m-%d-%H%M')
-        sys_config['StartTime'] = start_time_string
-        sys_config['StartString'] = start_date
-
-        # determine job directory
-        workspace_path = sys_config['WorkspacePathout']
-        job_path: str = f'{workspace_path}{short_name}_{start_date}'
+                    config: dict) -> None:
+
+        job_path: str = self.generate_job_directory_path(config)
         self.prepare_job_directory(job_path)
 
         log_file_path = f"{job_path}/log.txt"
 
         """
-        Now we have the job_dir defined, we can set up the logging
+        Now we have the job_dir defined, we can set up the logging (to log to the dir)
         """
-        if 'LoggingConfigPath' not in sys_config.keys():
-            raise Exception(f"'LoggingConfigPath' not found in sys_config. Either use the default template file in "
-                            f"configs/logger/template_log_config.json, or copy the template away and edit")
+        if 'LoggingConfigPath' not in config.keys():
+            raise Exception(f"'LoggingConfigPath' not found in config (is set in the sys_config). Either use the "
+                            f"default template file in configs/logger/template_log_config.json, or copy the template "
+                            f"away and edit")
 
-        log_config_path = sys_config['LoggingConfigPath']
+        log_config_path = config['LoggingConfigPath']
+        is_live: bool = config["live"]
         processor_utils.setup_logging(log_file_path,
                                       log_config_path,
                                       is_live,
-                                      args['log_level'])
+                                      config['log_level'])
 
         logger.info("==========")
         logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}")
-        logger.info(f"Universal config is\n{json.dumps(sys_config, indent = 2)}")
+        logger.info(f"Universal config is\n{json.dumps(config, indent = 2)}")
         logger.info(f"Job path will be {job_path}")
-        # workspace_path = sys_config['WorkspacePathout']
 
         # run any checks before creating a job directory
         # if this fails, then make a note once there is a job directory
-        ready = self.process_pre_job(args)
+        ready = self.process_pre_job(config)
 
         # lock job directory
         status: Jobstatus = Jobstatus(job_path)
@@ -232,43 +226,15 @@ class ProcessorBase:
 
             logger.info('Starting to work on each configuration')
 
+            config_path: str = config['config_path']
             logger.info(f'Working on config {config_path}')
 
-            try:
-                # read the run config json, and replace any tokens with values from the sys_config (such at the
-                # directory root for the installation)
-                config: dict = parse_json_file_with_tokens(config_path, sys_config)
-                self.config = config  # set the class variable so it can be accessed externally
-
-                # then add the sys_config keys and values to the configjson
-                for k, v in sys_config.items():
-                    if k not in config.keys():
-                        config[k] = v
-                    else:
-                        logger.warning(f"Key {k} already present in run config - not adding key with same name from "
-                                       f"the sys_config")
-
-                """
-                then add the args keys and values to the configjson, we will override any keys that are already 
-                in the configjson
-                """
-                for k, v in args.items():
-                    if k in config.keys():
-                        logger.warning(f"Key {k} already present in run config - overriding key with same name from "
-                                       f"the sys args")
-                    config[k] = v
-
-            except:
-                logger.exception(f"Failure in opening or checking config {config_path}")
-                # TODO: This case should test flagdir.jobStatus.__exit__()
-                raise  # endJob('ERROR',premature=True)
-
             # provide specific case details to template config
 
-            # from configtemplate create configFileName to describe the specific job
-            config_file_name = f"{os.path.basename(config_path).replace('.json', '')}_{component}"
-
-            config['ConfigFilePath'] = config_file_name
+            # from configtemplate store the base filename
+            config_file_name = os.path.basename(config_path)
+            config_file_name = os.path.splitext(config_file_name)[0]
+            # config['ConfigFilePath'] = config_file_name
 
             # write the complete configuration file to job directory
             with open(f"{job_path}/{config_file_name}.json", 'w') as write_file:
@@ -277,6 +243,7 @@ class ProcessorBase:
             # proc_description = universal_config['ProcessInJob']
             proc_description = 'ProcessInJob'
             try:
+                component: str = config['component']
                 proc_out: dict = self.process_in_job(job_path, status, config, component)
             except:
                 logger.exception(f"Error in process_in_job")
@@ -284,7 +251,7 @@ class ProcessorBase:
                 end_job(status, premature = True)
 
             # Set default case
-            # This would be improved by implementing a class structure - not ethat the proc_out is not curently used
+            # This would be improved by implementing a class structure - note that the proc_out is not curently used
             # as we now deal with copying outputs elsewhere. Keeping it here as a placeholder.
             if proc_out is None:
                 proc_out = {
@@ -315,9 +282,14 @@ class ProcessorBase:
 
             status.reset('SUCCESS')
 
+        short_name: str = config['short_name']
+        clearup: bool = config['clearup']
+
         if status.is_success() & (clearup is True):
             logger.info('Clearing up')
 
+            start_date: str = config['StartString']
+            workspace_path = config['WorkspacePathout']
             clearup_dest_dir = f"{workspace_path}/clearup/{short_name}_{start_date}/"
             Path(clearup_dest_dir).mkdir(parents = True, exist_ok = True)
 
@@ -327,6 +299,64 @@ class ProcessorBase:
 
         end_script(premature = False)
 
+
+    @staticmethod
+    def build_config(args: dict,
+                     config_path: str,
+                     sys_config_path: str):
+
+        """
+        Builds a 'unified' dict by reading the sys and run config json files and replacing any tokens with values from
+        the sys_config in the run_config. Additionally, values set in the args are used to supplement the config dict
+        generated form the json.
+
+        :param args:
+        :param config_path:
+        :param sys_config_path:
+        :return:
+        """
+        try:
+            sys_config = parse_json_file_with_tokens(sys_config_path)
+            # read the run config json, and replace any tokens with values from the sys_config (such as the
+            # directory root for the installation)
+            config: dict = parse_json_file_with_tokens(config_path, sys_config)
+            config['config_path'] = config_path
+
+            """
+            add the sys_config keys and values to the config dict (unless it is already declared in the config) 
+            """
+            for k, v in sys_config.items():
+                if k not in config.keys():
+                    config[k] = v
+                else:
+                    logger.warning(f"Key {k} already present in run config - not adding key with same name from "
+                                   f"the sys_config")
+
+            """
+            then add the args keys and values to the config, we will override any keys that are already 
+            in the config
+            """
+            for k, v in args.items():
+                if k in config.keys():
+                    logger.warning(f"Key {k} already present in run config - overriding key with same name from "
+                                   f"the sys args")
+                config[k] = v
+
+            #  set various time values based on the start_date that is passed in on the args
+            config['StartString'] = config.pop('start_date')  # we are renaming the 'start_date' args to 'StartString'
+            # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC
+            start_time: datetime = datetime.datetime.strptime(config['StartString'] + '03', '%Y%m%d%H')
+            start_time_string: str = start_time.strftime('%Y-%m-%d-%H%M')
+            config['StartTime'] = start_time_string
+
+        except:
+            logger.exception(f"Failure in opening or checking config {config_path}")
+            # TODO: This case should test flagdir.jobStatus.__exit__()
+            raise  # endJob('ERROR',premature=True)
+
+        return config
+
+
     @abstractmethod
     def process_pre_job(self, args) -> bool:
         raise NotImplementedError
@@ -345,7 +375,11 @@ class ProcessorBase:
             args["component"] = component
             args["short_name"] = shortname
 
-            self.run_process(args)
+            sys_config_path: str = args['sys_config_path']
+            config_path: str = args['config_path']
+            config: dict = self.build_config(args, config_path, sys_config_path)
+
+            self.run_process(config)
 
         except SystemExit as e:
             print("caught with code " + str(e.code))
diff --git a/ews/coordinator/processor_epidemiology.py b/ews/coordinator/processor_epidemiology.py
index f3024b7..fa6d891 100644
--- a/ews/coordinator/processor_epidemiology.py
+++ b/ews/coordinator/processor_epidemiology.py
@@ -58,7 +58,7 @@ class ProcessorEpidemiology(ProcessorBase):
         config: dict = parse_json_file_with_tokens(config_file)
 
         #  determine end time, from config file or the input args
-        arg_start_date: str = input_args['start_date']
+        arg_start_date: str = input_args['StartString']
         if 'CalculationSpanDays' in config:
             calc_span_days = config['CalculationSpanDays']
         elif 'CalculationSpanDays' in input_args:
@@ -79,10 +79,18 @@ class ProcessorEpidemiology(ProcessorBase):
         return True
 
 
-    def create_epi_config_string(self, config,jobPath,startString,endString):
+    @staticmethod
+    def create_epi_config_file_root(config: dict,
+                                    job_path: str,
+                                    start_string: str,
+                                    calc_span_days: [str, int]) -> str:
 
-        configtemplate_fn = config['ConfigFilePath']
-        configName_withoutEpi = f"{os.path.basename(configtemplate_fn).replace('.json','')}_{startString}-{endString}"
+        configtemplate_filename: str = config['config_path']
+        start_date, end_date = calc_epi_date_range(start_string, calc_span_days)
+        start_string = start_date.strftime('%Y%m%d')
+        end_string = end_date.strftime('%Y%m%d')
+
+        configName_withoutEpi = f"{os.path.basename(configtemplate_filename).replace('.json','')}_{start_string}-{end_string}"
 
         # create a string describing every epi calc configuration
         epiStrings = []
@@ -96,7 +104,7 @@ class ProcessorEpidemiology(ProcessorBase):
             epiCaseString = f"{epiconf['model'].lower()}{epiKwargsString}"
 
             # provide to configuration for output filename
-            epiconf["infectionRasterFileName"] = f"{jobPath}/infections_{configName_withoutEpi}_{epiCaseString}"
+            epiconf["infectionRasterFileName"] = f"{job_path}/infections_{configName_withoutEpi}_{epiCaseString}"
 
             epiStrings += [epiCaseString]
 
@@ -292,12 +300,14 @@ class ProcessorEpidemiology(ProcessorBase):
 
         reference_date_str = config['StartString']
         reference_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d')
+        calc_span_days: [str, int] = config['CalculationSpanDays']
 
-        start_date, end_date = calc_epi_date_range(reference_date_str, config['CalculationSpanDays'])
+        start_date, end_date = calc_epi_date_range(reference_date_str, calc_span_days)
 
         start_string = start_date.strftime('%Y-%m-%d-%H%M')
         start_string_short = start_date.strftime('%Y%m%d%H%M')
         end_string = end_date.strftime('%Y-%m-%d-%H%M')
+        end_string_short = end_date.strftime('%Y%m%d%H%M')
 
         # update config accordingly
         config['ReferenceTime'] = reference_date_str
@@ -309,7 +319,7 @@ class ProcessorEpidemiology(ProcessorBase):
         yesterday_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d') - datetime.timedelta(days=1)
         yesterday_string = yesterday_date.strftime('%Y%m%d')
 
-        diseases = config['DiseaseNames']
+        diseases: [str] = config['DiseaseNames']
 
         # get list of variable names to be loaded from deposition input
         depo_variable_names =  config['Deposition']['VariableNames']
@@ -342,13 +352,14 @@ class ProcessorEpidemiology(ProcessorBase):
             logger.info(f"Preparing for epidemiology calc of {disease} in {region}")
 
             # create config_filename to describe job configuration
-            config_filename = self.create_epi_config_string(config, case_specific_path, start_string, end_string)
+            config_filename = self.create_epi_config_file_root(config, case_specific_path,
+                                                               reference_date_str, calc_span_days)
 
             # prepare a directory for input data
             jobDataPath = f"{case_specific_path}/input_data/"
             Path(jobDataPath).mkdir(parents=True, exist_ok=True)
 
-            lastjobDataPath = jobDataPath.replace(f"EPI_{reference_date_str}",f"EPI_{yesterday_string}")
+            lastjobDataPath = jobDataPath.replace(f"EPI_{reference_date_str}", f"EPI_{yesterday_string}")
 
             # configure filename of prepared deposition data
 
diff --git a/ews/coordinator/utils/processor_utils.py b/ews/coordinator/utils/processor_utils.py
index 75a2eff..6ff72ce 100644
--- a/ews/coordinator/utils/processor_utils.py
+++ b/ews/coordinator/utils/processor_utils.py
@@ -277,7 +277,7 @@ def end_job(status, ignore_inprogress=False, **kwargs):
 
     end_script(**kwargs)
 
-def calc_epi_date_range(init_str, span_days = [0, 6]):
+def calc_epi_date_range(init_str: str, span_days: [str, int] = [0, 6]) -> [datetime, datetime]:
     '''Date range is  determined relative to init_date.
     span_days is usually defined in the job config file. Day zero is current
     day, negative values point to past (historical or analysis) days, and
diff --git a/tests/integration/partial/integration_test_utils.py b/tests/integration/partial/integration_test_utils.py
index 5b4b000..91d0fef 100644
--- a/tests/integration/partial/integration_test_utils.py
+++ b/tests/integration/partial/integration_test_utils.py
@@ -226,8 +226,6 @@ class IntegrationTestUtils:
         args_dict['start_date'] = start_date
         args_dict['component'] = component
         args_dict['short_name'] = shortname
-        args_dict['sys_config_path'] = sys_config_path
-        args_dict['config_path'] = config_path
         args_dict['log_level'] = 'info'
         args_dict['clearup'] = True
 
@@ -238,7 +236,8 @@ class IntegrationTestUtils:
         os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH
 
         try:
-            processor.run_process(args_dict)
+            config: dict = processor.build_config(args_dict, config_path, sys_config_path)
+            processor.run_process(config)
         except SystemExit as e:
             print(f"SystemExit: {e}")
             # we will eventually want to throw these to the calling class to be dealt with
@@ -253,14 +252,15 @@ class IntegrationTestUtils:
 
         args_dict: dict = {}
 
+        config_path: str = IntegrationTestUtils.RUN_SYS_CONFIG_FILE_PATH
+        sys_config_path: str = IntegrationTestUtils.RUN_CONFIG_FILE_PATH
+
         # note, possible to override these values in the kwargs loop below
         args_dict['live'] = False
         args_dict['noupload'] = True
         args_dict['start_date'] = start_date
         args_dict['component'] = component
         args_dict['short_name'] = short_name
-        args_dict['sys_config_path'] = IntegrationTestUtils.RUN_SYS_CONFIG_FILE_PATH
-        args_dict['config_path'] = IntegrationTestUtils.RUN_CONFIG_FILE_PATH
         args_dict['log_level'] = 'info'
         args_dict['clearup'] = True
 
@@ -271,7 +271,8 @@ class IntegrationTestUtils:
         os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH
 
         try:
-            processor.run_process(args_dict)
+            config: dict = processor.build_config(args_dict, config_path, sys_config_path)
+            processor.run_process(config)
         except SystemExit:
             # we will eventually want to throw these to the calling class to be dealt with
             pass
-- 
GitLab