diff --git a/ews/coordinator/processor_base.py b/ews/coordinator/processor_base.py index fb905c664910cfb945de19d4588c8aa5a5d60a0d..241e617a3f30ca2e89732b062c7617632e0a8ed4 100755 --- a/ews/coordinator/processor_base.py +++ b/ews/coordinator/processor_base.py @@ -45,7 +45,6 @@ class ProcessorBase: def __init__(self) -> None: super().__init__() - self.config: dict = {} # this is the config for the current job, gets set in run_process() time_now = datetime.datetime.today() self.today_string = time_now.strftime('%Y%m%d') self.now_string = time_now.strftime('%Y%m%d-%H%M-%S') @@ -141,6 +140,22 @@ class ProcessorBase: dictionary: dict = vars(args) return dictionary + @staticmethod + def generate_job_directory_path(config: dict) -> str: + """ + Generate the job directory path + + :param config: + :return: + """ + short_name: str = config['short_name'] + start_date: str = config['StartString'] + workspace_path = config['WorkspacePathout'] + + job_path: str = f'{workspace_path}{short_name}_{start_date}' + + return job_path + def prepare_job_directory(self, job_path: str): """ create job directory or archive if already exists (due to a rerun) @@ -158,57 +173,36 @@ class ProcessorBase: def run_process(self, - args: dict) -> None: - - # check initial state of each config file, and gather terms that must apply - # across all provided configs - - sys_config_path: str = args['sys_config_path'] - config_path: str = args['config_path'] - component: str = args['component'] - short_name: str = args['short_name'] - start_date: str = args['start_date'] - clearup: bool = args['clearup'] - is_live: bool = args["live"] - - # load universal configuration - sys_config = parse_json_file_with_tokens(sys_config_path) - - # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC - start_time: datetime = datetime.datetime.strptime(start_date + '03', '%Y%m%d%H') - start_time_string: str = start_time.strftime('%Y-%m-%d-%H%M') - sys_config['StartTime'] = start_time_string - sys_config['StartString'] = start_date - - # determine job directory - workspace_path = sys_config['WorkspacePathout'] - job_path: str = f'{workspace_path}{short_name}_{start_date}' + config: dict) -> None: + + job_path: str = self.generate_job_directory_path(config) self.prepare_job_directory(job_path) log_file_path = f"{job_path}/log.txt" """ - Now we have the job_dir defined, we can set up the logging + Now we have the job_dir defined, we can set up the logging (to log to the dir) """ - if 'LoggingConfigPath' not in sys_config.keys(): - raise Exception(f"'LoggingConfigPath' not found in sys_config. Either use the default template file in " - f"configs/logger/template_log_config.json, or copy the template away and edit") + if 'LoggingConfigPath' not in config.keys(): + raise Exception(f"'LoggingConfigPath' not found in config (is set in the sys_config). Either use the " + f"default template file in configs/logger/template_log_config.json, or copy the template " + f"away and edit") - log_config_path = sys_config['LoggingConfigPath'] + log_config_path = config['LoggingConfigPath'] + is_live: bool = config["live"] processor_utils.setup_logging(log_file_path, log_config_path, is_live, - args['log_level']) + config['log_level']) logger.info("==========") logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") - logger.info(f"Universal config is\n{json.dumps(sys_config, indent = 2)}") + logger.info(f"Universal config is\n{json.dumps(config, indent = 2)}") logger.info(f"Job path will be {job_path}") - # workspace_path = sys_config['WorkspacePathout'] # run any checks before creating a job directory # if this fails, then make a note once there is a job directory - ready = self.process_pre_job(args) + ready = self.process_pre_job(config) # lock job directory status: Jobstatus = Jobstatus(job_path) @@ -232,43 +226,15 @@ class ProcessorBase: logger.info('Starting to work on each configuration') + config_path: str = config['config_path'] logger.info(f'Working on config {config_path}') - try: - # read the run config json, and replace any tokens with values from the sys_config (such at the - # directory root for the installation) - config: dict = parse_json_file_with_tokens(config_path, sys_config) - self.config = config # set the class variable so it can be accessed externally - - # then add the sys_config keys and values to the configjson - for k, v in sys_config.items(): - if k not in config.keys(): - config[k] = v - else: - logger.warning(f"Key {k} already present in run config - not adding key with same name from " - f"the sys_config") - - """ - then add the args keys and values to the configjson, we will override any keys that are already - in the configjson - """ - for k, v in args.items(): - if k in config.keys(): - logger.warning(f"Key {k} already present in run config - overriding key with same name from " - f"the sys args") - config[k] = v - - except: - logger.exception(f"Failure in opening or checking config {config_path}") - # TODO: This case should test flagdir.jobStatus.__exit__() - raise # endJob('ERROR',premature=True) - # provide specific case details to template config - # from configtemplate create configFileName to describe the specific job - config_file_name = f"{os.path.basename(config_path).replace('.json', '')}_{component}" - - config['ConfigFilePath'] = config_file_name + # from configtemplate store the base filename + config_file_name = os.path.basename(config_path) + config_file_name = os.path.splitext(config_file_name)[0] + # config['ConfigFilePath'] = config_file_name # write the complete configuration file to job directory with open(f"{job_path}/{config_file_name}.json", 'w') as write_file: @@ -277,6 +243,7 @@ class ProcessorBase: # proc_description = universal_config['ProcessInJob'] proc_description = 'ProcessInJob' try: + component: str = config['component'] proc_out: dict = self.process_in_job(job_path, status, config, component) except: logger.exception(f"Error in process_in_job") @@ -284,7 +251,7 @@ class ProcessorBase: end_job(status, premature = True) # Set default case - # This would be improved by implementing a class structure - not ethat the proc_out is not curently used + # This would be improved by implementing a class structure - note that the proc_out is not curently used # as we now deal with copying outputs elsewhere. Keeping it here as a placeholder. if proc_out is None: proc_out = { @@ -315,9 +282,14 @@ class ProcessorBase: status.reset('SUCCESS') + short_name: str = config['short_name'] + clearup: bool = config['clearup'] + if status.is_success() & (clearup is True): logger.info('Clearing up') + start_date: str = config['StartString'] + workspace_path = config['WorkspacePathout'] clearup_dest_dir = f"{workspace_path}/clearup/{short_name}_{start_date}/" Path(clearup_dest_dir).mkdir(parents = True, exist_ok = True) @@ -327,6 +299,64 @@ class ProcessorBase: end_script(premature = False) + + @staticmethod + def build_config(args: dict, + config_path: str, + sys_config_path: str): + + """ + Builds a 'unified' dict by reading the sys and run config json files and replacing any tokens with values from + the sys_config in the run_config. Additionally, values set in the args are used to supplement the config dict + generated form the json. + + :param args: + :param config_path: + :param sys_config_path: + :return: + """ + try: + sys_config = parse_json_file_with_tokens(sys_config_path) + # read the run config json, and replace any tokens with values from the sys_config (such as the + # directory root for the installation) + config: dict = parse_json_file_with_tokens(config_path, sys_config) + config['config_path'] = config_path + + """ + add the sys_config keys and values to the config dict (unless it is already declared in the config) + """ + for k, v in sys_config.items(): + if k not in config.keys(): + config[k] = v + else: + logger.warning(f"Key {k} already present in run config - not adding key with same name from " + f"the sys_config") + + """ + then add the args keys and values to the config, we will override any keys that are already + in the config + """ + for k, v in args.items(): + if k in config.keys(): + logger.warning(f"Key {k} already present in run config - overriding key with same name from " + f"the sys args") + config[k] = v + + # set various time values based on the start_date that is passed in on the args + config['StartString'] = config.pop('start_date') # we are renaming the 'start_date' args to 'StartString' + # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC + start_time: datetime = datetime.datetime.strptime(config['StartString'] + '03', '%Y%m%d%H') + start_time_string: str = start_time.strftime('%Y-%m-%d-%H%M') + config['StartTime'] = start_time_string + + except: + logger.exception(f"Failure in opening or checking config {config_path}") + # TODO: This case should test flagdir.jobStatus.__exit__() + raise # endJob('ERROR',premature=True) + + return config + + @abstractmethod def process_pre_job(self, args) -> bool: raise NotImplementedError @@ -345,7 +375,11 @@ class ProcessorBase: args["component"] = component args["short_name"] = shortname - self.run_process(args) + sys_config_path: str = args['sys_config_path'] + config_path: str = args['config_path'] + config: dict = self.build_config(args, config_path, sys_config_path) + + self.run_process(config) except SystemExit as e: print("caught with code " + str(e.code)) diff --git a/ews/coordinator/processor_epidemiology.py b/ews/coordinator/processor_epidemiology.py index f3024b76193b2d274281bd809e3d8369173148ca..fa6d891b137ca3c7848e4a252bfb62c6143fe7e5 100644 --- a/ews/coordinator/processor_epidemiology.py +++ b/ews/coordinator/processor_epidemiology.py @@ -58,7 +58,7 @@ class ProcessorEpidemiology(ProcessorBase): config: dict = parse_json_file_with_tokens(config_file) # determine end time, from config file or the input args - arg_start_date: str = input_args['start_date'] + arg_start_date: str = input_args['StartString'] if 'CalculationSpanDays' in config: calc_span_days = config['CalculationSpanDays'] elif 'CalculationSpanDays' in input_args: @@ -79,10 +79,18 @@ class ProcessorEpidemiology(ProcessorBase): return True - def create_epi_config_string(self, config,jobPath,startString,endString): + @staticmethod + def create_epi_config_file_root(config: dict, + job_path: str, + start_string: str, + calc_span_days: [str, int]) -> str: - configtemplate_fn = config['ConfigFilePath'] - configName_withoutEpi = f"{os.path.basename(configtemplate_fn).replace('.json','')}_{startString}-{endString}" + configtemplate_filename: str = config['config_path'] + start_date, end_date = calc_epi_date_range(start_string, calc_span_days) + start_string = start_date.strftime('%Y%m%d') + end_string = end_date.strftime('%Y%m%d') + + configName_withoutEpi = f"{os.path.basename(configtemplate_filename).replace('.json','')}_{start_string}-{end_string}" # create a string describing every epi calc configuration epiStrings = [] @@ -96,7 +104,7 @@ class ProcessorEpidemiology(ProcessorBase): epiCaseString = f"{epiconf['model'].lower()}{epiKwargsString}" # provide to configuration for output filename - epiconf["infectionRasterFileName"] = f"{jobPath}/infections_{configName_withoutEpi}_{epiCaseString}" + epiconf["infectionRasterFileName"] = f"{job_path}/infections_{configName_withoutEpi}_{epiCaseString}" epiStrings += [epiCaseString] @@ -292,12 +300,14 @@ class ProcessorEpidemiology(ProcessorBase): reference_date_str = config['StartString'] reference_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d') + calc_span_days: [str, int] = config['CalculationSpanDays'] - start_date, end_date = calc_epi_date_range(reference_date_str, config['CalculationSpanDays']) + start_date, end_date = calc_epi_date_range(reference_date_str, calc_span_days) start_string = start_date.strftime('%Y-%m-%d-%H%M') start_string_short = start_date.strftime('%Y%m%d%H%M') end_string = end_date.strftime('%Y-%m-%d-%H%M') + end_string_short = end_date.strftime('%Y%m%d%H%M') # update config accordingly config['ReferenceTime'] = reference_date_str @@ -309,7 +319,7 @@ class ProcessorEpidemiology(ProcessorBase): yesterday_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d') - datetime.timedelta(days=1) yesterday_string = yesterday_date.strftime('%Y%m%d') - diseases = config['DiseaseNames'] + diseases: [str] = config['DiseaseNames'] # get list of variable names to be loaded from deposition input depo_variable_names = config['Deposition']['VariableNames'] @@ -342,13 +352,14 @@ class ProcessorEpidemiology(ProcessorBase): logger.info(f"Preparing for epidemiology calc of {disease} in {region}") # create config_filename to describe job configuration - config_filename = self.create_epi_config_string(config, case_specific_path, start_string, end_string) + config_filename = self.create_epi_config_file_root(config, case_specific_path, + reference_date_str, calc_span_days) # prepare a directory for input data jobDataPath = f"{case_specific_path}/input_data/" Path(jobDataPath).mkdir(parents=True, exist_ok=True) - lastjobDataPath = jobDataPath.replace(f"EPI_{reference_date_str}",f"EPI_{yesterday_string}") + lastjobDataPath = jobDataPath.replace(f"EPI_{reference_date_str}", f"EPI_{yesterday_string}") # configure filename of prepared deposition data diff --git a/ews/coordinator/utils/processor_utils.py b/ews/coordinator/utils/processor_utils.py index 75a2eff38d884e12f1249b9de84c203a7edcd2f4..6ff72ce0b279b72466f1cd2c76d1f303e9a9e5fb 100644 --- a/ews/coordinator/utils/processor_utils.py +++ b/ews/coordinator/utils/processor_utils.py @@ -277,7 +277,7 @@ def end_job(status, ignore_inprogress=False, **kwargs): end_script(**kwargs) -def calc_epi_date_range(init_str, span_days = [0, 6]): +def calc_epi_date_range(init_str: str, span_days: [str, int] = [0, 6]) -> [datetime, datetime]: '''Date range is determined relative to init_date. span_days is usually defined in the job config file. Day zero is current day, negative values point to past (historical or analysis) days, and diff --git a/tests/integration/partial/integration_test_utils.py b/tests/integration/partial/integration_test_utils.py index 5b4b000c3e3258143f270d753fa2f97e3d9ea315..91d0fefc0493366821438c73dc1ba718e4e2cc2f 100644 --- a/tests/integration/partial/integration_test_utils.py +++ b/tests/integration/partial/integration_test_utils.py @@ -226,8 +226,6 @@ class IntegrationTestUtils: args_dict['start_date'] = start_date args_dict['component'] = component args_dict['short_name'] = shortname - args_dict['sys_config_path'] = sys_config_path - args_dict['config_path'] = config_path args_dict['log_level'] = 'info' args_dict['clearup'] = True @@ -238,7 +236,8 @@ class IntegrationTestUtils: os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH try: - processor.run_process(args_dict) + config: dict = processor.build_config(args_dict, config_path, sys_config_path) + processor.run_process(config) except SystemExit as e: print(f"SystemExit: {e}") # we will eventually want to throw these to the calling class to be dealt with @@ -253,14 +252,15 @@ class IntegrationTestUtils: args_dict: dict = {} + config_path: str = IntegrationTestUtils.RUN_SYS_CONFIG_FILE_PATH + sys_config_path: str = IntegrationTestUtils.RUN_CONFIG_FILE_PATH + # note, possible to override these values in the kwargs loop below args_dict['live'] = False args_dict['noupload'] = True args_dict['start_date'] = start_date args_dict['component'] = component args_dict['short_name'] = short_name - args_dict['sys_config_path'] = IntegrationTestUtils.RUN_SYS_CONFIG_FILE_PATH - args_dict['config_path'] = IntegrationTestUtils.RUN_CONFIG_FILE_PATH args_dict['log_level'] = 'info' args_dict['clearup'] = True @@ -271,7 +271,8 @@ class IntegrationTestUtils: os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH try: - processor.run_process(args_dict) + config: dict = processor.build_config(args_dict, config_path, sys_config_path) + processor.run_process(config) except SystemExit: # we will eventually want to throw these to the calling class to be dealt with pass