From 8e6db6196044a6adf0eceae3146e9478df849435 Mon Sep 17 00:00:00 2001
From: lb584 <lb584@cam.ac.uk>
Date: Thu, 29 Aug 2024 10:24:12 +0100
Subject: [PATCH] removing code that allows multiple configs top be passed from
 the command line as no longer needed adding ability to set additional values
 to the run config from the command line args (if not already present in the
 config json) modifying the epi processor to look for CalculationSpanDays in
 both the config json and the command line args, as preparation for use with
 the Epi fitting, which will pass calcspan as an arg

---
 ews/coordinator/processor_base.py             | 169 +++++++++---------
 ews/coordinator/processor_epidemiology.py     |  87 +++++----
 .../partial/integration_test_utils.py         |   6 +-
 3 files changed, 130 insertions(+), 132 deletions(-)

diff --git a/ews/coordinator/processor_base.py b/ews/coordinator/processor_base.py
index 535e4d3..3f1862f 100755
--- a/ews/coordinator/processor_base.py
+++ b/ews/coordinator/processor_base.py
@@ -45,6 +45,7 @@ class ProcessorBase:
 
     def __init__(self) -> None:
         super().__init__()
+        self.config: dict = {}  # this is the config for the current job, gets set in run_process()
         time_now = datetime.datetime.today()
         self.today_string = time_now.strftime('%Y%m%d')
         self.now_string = time_now.strftime('%Y%m%d-%H%M-%S')
@@ -72,12 +73,9 @@ class ProcessorBase:
                 '-c', '--config',
                 metavar = 'path',
                 type = str,
-                nargs = '+',  # require at least one path
-                dest = 'config_paths',
+                dest = 'config_path',
                 required = True,
-                help = '''path to a config file(s). More than one can be provided,
-        in which case each is worked on in turn (e.g. one each for stripe, stem, leaf).
-        Do not place other options between these.''')
+                help = '''path to a config file.''')
 
         my_parser.add_argument(
                 '-l', '--loglevel',
@@ -124,10 +122,6 @@ class ProcessorBase:
 
         print(f"Command-line options are:\n{args}")
 
-        if not isinstance(args.config_paths, list):
-            print('Expecting a list of config paths')
-            raise RuntimeError
-
         # check the startstring
         if args.start_date is not self.today_string:
             try:
@@ -213,11 +207,12 @@ class ProcessorBase:
         # across all provided configs
 
         sys_config_path: str = args['sys_config_path']
-        config_paths: List[str] = args['config_paths']
+        config_path: str = args['config_path']
         component: str = args['component']
         short_name: str = args['short_name']
         start_date: str = args['start_date']
         clearup: bool = args['clearup']
+        is_live: bool = args["live"]
 
         # load universal configuration
         sys_config = parse_json_file_with_tokens(sys_config_path)
@@ -233,7 +228,6 @@ class ProcessorBase:
         job_path: str = f'{workspace_path}{short_name}_{start_date}'
         self.prepare_job_directory(job_path)
 
-        is_live: bool = args["live"]
         log_file_path = f"{job_path}/log.txt"
 
         """
@@ -260,8 +254,8 @@ class ProcessorBase:
         ready = self.process_pre_job(args)
 
         # lock job directory
-        status: Jobstatus
-        with Jobstatus(job_path) as status:
+        status: Jobstatus = Jobstatus(job_path)
+        with status:
 
             # check for a status file in job directory
             if status.had_initial_status:
@@ -279,80 +273,87 @@ class ProcessorBase:
 
                 end_job(status, ignore_inprogress = True, premature = False)
 
-            # files and directories that will be earmarked for removal after a
-            # successful job
+            logger.info('Starting to work on each configuration')
+
+            logger.info(f'Working on config {config_path}')
+
+            try:
+                # read the run config json, and replace any tokens with values from the sys_config (such at the
+                # directory root for the installation)
+                config: dict = parse_json_file_with_tokens(config_path, sys_config)
+
+                # then add the sys_config keys and values to the configjson
+                for k, v in sys_config.items():
+                    if k not in config.keys():
+                        config[k] = v
+                    else:
+                        logger.warning(f"Key {k} already present in run config - not adding key with same name from "
+                                       f"the sys_config")
+
+                """
+                then add the args keys and values to the configjson, we will override any keys that are already 
+                in the configjson
+                """
+                for k, v in args.items():
+                    if k in config.keys():
+                        logger.warning(f"Key {k} already present in run config - overriding key with same name from "
+                                       f"the sys args")
+                    config[k] = v
+
+            except:
+                logger.exception(f"Failure in opening or checking config {config_path}")
+                # TODO: This case should test flagdir.jobStatus.__exit__()
+                raise  # endJob('ERROR',premature=True)
+
+            # provide specific case details to template config
+
+            # from configtemplate create configFileName to describe the specific job
+            config_file_name = f"{os.path.basename(config_path).replace('.json', '')}_{component}"
+
+            config['ConfigFilePath'] = config_file_name
+
+            # write the complete configuration file to job directory
+            with open(f"{job_path}/{config_file_name}.json", 'w') as write_file:
+                json.dump(config, write_file, indent = 4)
+
+            # proc_description = universal_config['ProcessInJob']
+            proc_description = 'ProcessInJob'
+            try:
+                proc_out: dict = self.process_in_job(job_path, status, config, component)
+            except:
+                logger.exception(f"Error in process_in_job")
+                status.reset('ERROR')
+                end_job(status, premature = True)
+
+            # Set default case
+            # This would be improved by implementing a class structure - not ethat the proc_out is not curently used
+            # as we now deal with copying outputs elsewhere. Keeping it here as a placeholder.
+            if proc_out is None:
+                proc_out = {
+                    'output':  None,
+                    'clearup': None}
+
             paths_to_clear = []
+            if 'clearup' in proc_out.keys():
+                append_item_to_list(
+                        proc_out['clearup'],
+                        paths_to_clear,
+                        proc_description,
+                        status)
 
-            logger.info('Starting to work on each configuration')
+            # Run EWS-plotting command
+
+            proc_description = 'ProcessEWSPlotting'
+            try:
+                self.process_post_job(job_path, config)
+            except:
+                logger.exception(f"Error in {proc_description}()")
+                status.reset('ERROR')
+                end_job(status, premature = True)
+
+            logger.info('Finished with EWS-Plotting, appending images to list for transfer')
 
-            for configIndex, configtemplate in enumerate(config_paths):
-
-                config_paths_length: int = len(config_paths)
-                logger.info(f'Working on config {configIndex + 1} of {config_paths_length}')
-
-                try:
-                    config: dict = parse_json_file_with_tokens(configtemplate, sys_config)
-                    # then add the sys_config keys and values to the configjson
-                    for k, v in sys_config.items():
-                        if k not in config.keys():
-                            config[k] = v
-                        else:
-                            logger.warning(f"Key {k} already present in run config - not adding key with same name from "
-                                           f"the sys_config")
-
-                except:
-                    logger.exception(f"Failure in opening or checking config {configtemplate}")
-                    # TODO: This case should test flagdir.jobStatus.__exit__()
-                    raise  # endJob('ERROR',premature=True)
-
-                # provide specific case details to template config
-
-                # from configtemplate create configFileName to describe the specific job
-                config_file_name = f"{os.path.basename(configtemplate).replace('.json', '')}_{component}"
-
-                config['ConfigFilePath'] = config_file_name
-
-                # write the complete configuration file to job directory
-                with open(f"{job_path}/{config_file_name}.json", 'w') as write_file:
-                    json.dump(config, write_file, indent = 4)
-
-                # proc_description = universal_config['ProcessInJob']
-                proc_description = 'ProcessInJob'
-                try:
-                    proc_out: dict = self.process_in_job(job_path, status, config, component)
-                except:
-                    logger.exception(f"Error in process_in_job")
-                    status.reset('ERROR')
-                    end_job(status, premature = True)
-
-                # Set default case
-                # This would be improved by implementing a class structure - not ethat the proc_out is not curently used
-                # as we now deal with copying outputs elsewhere. Keeping it here as a placeholder.
-                if proc_out is None:
-                    proc_out = {
-                        'output':  None,
-                        'clearup': None}
-
-                if 'clearup' in proc_out.keys():
-                    append_item_to_list(
-                            proc_out['clearup'],
-                            paths_to_clear,
-                            proc_description,
-                            status)
-
-                # Run EWS-plotting command
-
-                proc_description = 'ProcessEWSPlotting'
-                try:
-                    self.process_post_job(job_path, config)
-                except:
-                    logger.exception(f"Error in {proc_description}()")
-                    status.reset('ERROR')
-                    end_job(status, premature = True)
-
-                logger.info('Finished with EWS-Plotting, appending images to list for transfer')
-
-                logger.info(f'Finished with config {configIndex + 1} of {config_paths_length}')
+            logger.info(f'Finished with config {config_path}')
 
             status.reset('SUCCESS')
 
diff --git a/ews/coordinator/processor_epidemiology.py b/ews/coordinator/processor_epidemiology.py
index 723bc3d..0a2aa23 100644
--- a/ews/coordinator/processor_epidemiology.py
+++ b/ews/coordinator/processor_epidemiology.py
@@ -52,29 +52,29 @@ class ProcessorEpidemiology(ProcessorBase):
 
         logger.info('started process_pre_job_epi()')
 
-        # check pre-requisite jobs are complete
-        # query_past_successes(input_args, 'Deposition')
-        #
-        # query_past_successes(input_args, 'Environment')
+        config_file: str = input_args['config_path']
 
-        config_fns: List[str] = input_args['config_paths']
+        # they should be working if the script made it this far, no need to try
+        config: dict = parse_json_file_with_tokens(config_file)
 
-        for configFile in config_fns:
+        #  determine end time, from config file or the input args
+        arg_start_date: str = input_args['start_date']
+        if 'CalculationSpanDays' in config:
+            calc_span_days = config['CalculationSpanDays']
+        elif 'CalculationSpanDays' in input_args:
+            calc_span_days = input_args['CalculationSpanDays']
+        else:
+            logger.error('CalculationSpanDays not defined in config or input_args')
+            return False
 
-            # they should be working if the script made it this far, no need to try
-            config_i = parse_json_file_with_tokens(configFile)
+        assert len(calc_span_days) == 2
 
-            #determine end time, from config file
-            arg_start_date: str = input_args['start_date']
-            calc_span_days = config_i['CalculationSpanDays']
-            assert len(calc_span_days) == 2
+        start_time, end_time = calc_epi_date_range(arg_start_date,calc_span_days)
 
-            start_time, end_time = calc_epi_date_range(arg_start_date,calc_span_days)
-
-            # warn if it is a long timespan
-            date_diff = end_time - start_time
-            if date_diff.days > 100:
-                logger.warning("More than 100 days will be calculated over, likely longer than any single season")
+        # warn if it is a long timespan
+        date_diff = end_time - start_time
+        if date_diff.days > 100:
+            logger.warning("More than 100 days will be calculated over, likely longer than any single season")
 
         return True
 
@@ -286,14 +286,14 @@ class ProcessorEpidemiology(ProcessorBase):
         #   to end date, so choose an apporporiate lister, e.g.
         #   list_onefile_operational
 
-        is_continue = config.get('continue',False)
+        is_continue = config.get('continue', False)
 
         # initialise any needed variables
 
         reference_date_str = config['StartString']
         reference_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d')
 
-        start_date, end_date = calc_epi_date_range(reference_date_str,config['CalculationSpanDays'])
+        start_date, end_date = calc_epi_date_range(reference_date_str, config['CalculationSpanDays'])
 
         start_string = start_date.strftime('%Y-%m-%d-%H%M')
         start_string_short = start_date.strftime('%Y%m%d%H%M')
@@ -327,7 +327,7 @@ class ProcessorEpidemiology(ProcessorBase):
             config['SubRegionName'] = region
             config['DiseaseName'] = disease
 
-            config_epi = config.copy()
+            # config_epi = config.copy()
 
             # TODO: CAUTION: Any iterations (e.g. disease or sub-region) are hidden
             # in jobPath, and not retained in the config file. This is a provlem for
@@ -342,7 +342,7 @@ class ProcessorEpidemiology(ProcessorBase):
             logger.info(f"Preparing for epidemiology calc of {disease} in {region}")
 
             # create config_filename to describe job configuration
-            config_filename = self.create_epi_config_string(config,case_specific_path,start_string,end_string)
+            config_filename = self.create_epi_config_string(config, case_specific_path, start_string, end_string)
 
             # prepare a directory for input data
             jobDataPath = f"{case_specific_path}/input_data/"
@@ -352,7 +352,7 @@ class ProcessorEpidemiology(ProcessorBase):
 
             # configure filename of prepared deposition data
 
-            if 'Deposition' in config_epi:
+            if 'Deposition' in config:
 
                 # determine which variable name to load for this disease
                 disease_idx = [i for i,j in enumerate(diseases) if j==disease][0]
@@ -361,7 +361,6 @@ class ProcessorEpidemiology(ProcessorBase):
 
                 try:
                     prep.gather_dependent_models(
-                            config_epi,
                             config,
                             variable_name,
                             start_date,
@@ -378,13 +377,12 @@ class ProcessorEpidemiology(ProcessorBase):
             
             # configure filename of prepared environment data
 
-            if 'Environment' in config_epi:
+            if 'Environment' in config:
 
                 logger.info('Preparing environmental suitability data')
 
                 try:
                     prep.gather_dependent_models(
-                            config_epi,
                             config,
                             variable_name,
                             start_date,
@@ -406,13 +404,13 @@ class ProcessorEpidemiology(ProcessorBase):
             logger.info('Preparing a copy of the host raster data')
 
             # TargetRaster defines the grid that the epi model works on.
-            assert 'TargetRaster' in config_epi['Host']
+            assert 'TargetRaster' in config['Host']
 
             # It should have been generated in advance by the user, by reprojecting
             # the available host map (e.g. MapSPAM) to the NAME output grid.
             # wheat_raster_reprojection.py is available to support this.
 
-            if 'HostRasters' in config_epi['Host']:
+            if 'HostRasters' in config['Host']:
                 # HostRasters is a dictionary with date: filename entries describing
                 # different host rasters valid at different times i.e. a simple
                 # representation of dynamic host, so prepare a host file as is done
@@ -420,7 +418,7 @@ class ProcessorEpidemiology(ProcessorBase):
 
                 # All host maps should have the same spatial grid as the TargetRaster
 
-                rasters_dict = config_epi['Host']['HostRasters']
+                rasters_dict = config['Host']['HostRasters']
 
                 dst_host_csv = f"{jobDataPath}/data_input_host.csv"
 
@@ -429,13 +427,13 @@ class ProcessorEpidemiology(ProcessorBase):
             else:
                 # There is a host raster applicable to all times, i.e. static host
 
-                src_host = config_epi['Host']['TargetRaster']
+                src_host = config['Host']['TargetRaster']
                 fn_host = os.path.basename(src_host)
                 dst_host = f"{jobDataPath}/{fn_host}"
 
                 # copy the tif to the job directory and refer to that instead
                 shutil.copyfile(src_host,dst_host)
-                config_epi['Host']['TargetRaster'] = dst_host
+                config['Host']['TargetRaster'] = dst_host
 
                 logger.info('Preparing a copy of the host data as csv')
 
@@ -445,15 +443,15 @@ class ProcessorEpidemiology(ProcessorBase):
                         {'201001010000': dst_host},
                         dst_host_csv)
 
-            config_epi['Host']['HostCSV'] = dst_host_csv
-            config_epi['Host']['FileNamePrepared'] = dst_host_csv
+            config['Host']['HostCSV'] = dst_host_csv
+            config['Host']['FileNamePrepared'] = dst_host_csv
 
             # Preparing any continue-run files
             if is_continue is True:
 
                 logger.debug('This is a continue run.')
 
-                for ci in config_epi['Epi']:
+                for ci in config['Epi']:
                 
                     model_name = ci['model']
 
@@ -463,7 +461,6 @@ class ProcessorEpidemiology(ProcessorBase):
                     
                     try:
                         prep.gather_dependent_models(
-                                config_epi,
                                 config,
                                 variable_name,
                                 start_date,
@@ -482,12 +479,12 @@ class ProcessorEpidemiology(ProcessorBase):
             # provide fundamental config elements to config_epi
             for k,v in config.items():
                 if k not in short_name.keys():
-                    config_epi[k]=v
+                    config[k]=v
 
             if is_continue is True:
                 # This will be used in the epi model and not the data preparation
                 continue_start_date = reference_date+datetime.timedelta(hours=3)
-                config_epi['ContinueStartTime'] = continue_start_date.strftime('%Y-%m-%d-%H%M')
+                config['ContinueStartTime'] = continue_start_date.strftime('%Y-%m-%d-%H%M')
 
             def print_item(item):
                 logger.debug(f"Item {item}")
@@ -503,16 +500,16 @@ class ProcessorEpidemiology(ProcessorBase):
 
             logger.debug('Incremental configuration looks like:')
 
-            iterate(config_epi)
+            iterate(config)
 
             logger.debug('Complete configuration looks like:')
-            logger.debug(json.dumps(config_epi,indent=2))
+            logger.debug(json.dumps(config, indent=2))
 
             # write the complete configuration file to job directory
             run_config_fn = f"{case_specific_path}/{config_filename}.json"
 
             with open(run_config_fn,'w') as write_file:
-                json.dump(config_epi,write_file,indent=4)
+                json.dump(config, write_file, indent=4)
 
             # run epi model
 
@@ -535,7 +532,7 @@ class ProcessorEpidemiology(ProcessorBase):
             def calc_mean(arr):
                 return 'mean', arr.mean()
 
-            for epiconf in config_epi['Epi']:
+            for epiconf in config['Epi']:
 
                 outfile = epiconf["infectionRasterFileName"]
 
@@ -565,7 +562,7 @@ class ProcessorEpidemiology(ProcessorBase):
             figure_func = getattr(EpiAnalysis, 'plot_compare_epi_cases')
 
             # isolate the config for this function, in case of modifications
-            config_epi_for_comparison = config_epi.copy()
+            config_epi_for_comparison = config.copy()
 
             fig,axes,cases = figure_func(
                     config_epi_for_comparison,
@@ -578,7 +575,7 @@ class ProcessorEpidemiology(ProcessorBase):
 
             # slice the epi results into before forecast and in forecast
 
-            for epiconf in config_epi['Epi']:
+            for epiconf in config['Epi']:
 
                 unit_description = ''
                 if epiconf.get('rescale_output_by_host_raster',False) is True:
@@ -587,12 +584,12 @@ class ProcessorEpidemiology(ProcessorBase):
 
                 # load the full epi results
                 df_full = read_csv(outfile,header=[0],index_col=[0,1])
-                column_date_fmt = f"{config_epi['StartTimeShort']}_%Y%m%d%H%M"
+                column_date_fmt = f"{config['StartTimeShort']}_%Y%m%d%H%M"
                 df_full_dates = to_datetime(df_full.columns.astype('str'),format=column_date_fmt)
 
                 # determine date to cut with
                 # plus 1 minute so midnight is associated with preceding day
-                date_to_cut = datetime.datetime.strptime(config_epi['StartString']+'0001','%Y%m%d%H%M')
+                date_to_cut = datetime.datetime.strptime(config['StartString'] + '0001', '%Y%m%d%H%M')
                 dates_after_cut = df_full_dates >= date_to_cut
                 idx = argmax(dates_after_cut)-1
 
diff --git a/tests/integration/partial/integration_test_utils.py b/tests/integration/partial/integration_test_utils.py
index 54e48dd..5b4b000 100644
--- a/tests/integration/partial/integration_test_utils.py
+++ b/tests/integration/partial/integration_test_utils.py
@@ -218,7 +218,7 @@ class IntegrationTestUtils:
         args_dict: dict = {}
 
         sys_config_path = IntegrationTestUtils.RUN_SYS_CONFIG_FILE_PATH
-        config_paths = [IntegrationTestUtils.RUN_CONFIG_FILE_PATH]
+        config_path = IntegrationTestUtils.RUN_CONFIG_FILE_PATH
 
         # note, possible to override these values in the kwargs loop below
         args_dict['live'] = False
@@ -227,7 +227,7 @@ class IntegrationTestUtils:
         args_dict['component'] = component
         args_dict['short_name'] = shortname
         args_dict['sys_config_path'] = sys_config_path
-        args_dict['config_paths'] = config_paths
+        args_dict['config_path'] = config_path
         args_dict['log_level'] = 'info'
         args_dict['clearup'] = True
 
@@ -260,7 +260,7 @@ class IntegrationTestUtils:
         args_dict['component'] = component
         args_dict['short_name'] = short_name
         args_dict['sys_config_path'] = IntegrationTestUtils.RUN_SYS_CONFIG_FILE_PATH
-        args_dict['config_paths'] = [IntegrationTestUtils.RUN_CONFIG_FILE_PATH]
+        args_dict['config_path'] = IntegrationTestUtils.RUN_CONFIG_FILE_PATH
         args_dict['log_level'] = 'info'
         args_dict['clearup'] = True
 
-- 
GitLab