diff --git a/ews/coordinator/processor_base.py b/ews/coordinator/processor_base.py index 535e4d343dac3c9d73907b7b24390e86f6292329..3f1862f67628ed90cf351f85442871f5eea7fab6 100755 --- a/ews/coordinator/processor_base.py +++ b/ews/coordinator/processor_base.py @@ -45,6 +45,7 @@ class ProcessorBase: def __init__(self) -> None: super().__init__() + self.config: dict = {} # this is the config for the current job, gets set in run_process() time_now = datetime.datetime.today() self.today_string = time_now.strftime('%Y%m%d') self.now_string = time_now.strftime('%Y%m%d-%H%M-%S') @@ -72,12 +73,9 @@ class ProcessorBase: '-c', '--config', metavar = 'path', type = str, - nargs = '+', # require at least one path - dest = 'config_paths', + dest = 'config_path', required = True, - help = '''path to a config file(s). More than one can be provided, - in which case each is worked on in turn (e.g. one each for stripe, stem, leaf). - Do not place other options between these.''') + help = '''path to a config file.''') my_parser.add_argument( '-l', '--loglevel', @@ -124,10 +122,6 @@ class ProcessorBase: print(f"Command-line options are:\n{args}") - if not isinstance(args.config_paths, list): - print('Expecting a list of config paths') - raise RuntimeError - # check the startstring if args.start_date is not self.today_string: try: @@ -213,11 +207,12 @@ class ProcessorBase: # across all provided configs sys_config_path: str = args['sys_config_path'] - config_paths: List[str] = args['config_paths'] + config_path: str = args['config_path'] component: str = args['component'] short_name: str = args['short_name'] start_date: str = args['start_date'] clearup: bool = args['clearup'] + is_live: bool = args["live"] # load universal configuration sys_config = parse_json_file_with_tokens(sys_config_path) @@ -233,7 +228,6 @@ class ProcessorBase: job_path: str = f'{workspace_path}{short_name}_{start_date}' self.prepare_job_directory(job_path) - is_live: bool = args["live"] log_file_path = f"{job_path}/log.txt" """ @@ -260,8 +254,8 @@ class ProcessorBase: ready = self.process_pre_job(args) # lock job directory - status: Jobstatus - with Jobstatus(job_path) as status: + status: Jobstatus = Jobstatus(job_path) + with status: # check for a status file in job directory if status.had_initial_status: @@ -279,80 +273,87 @@ class ProcessorBase: end_job(status, ignore_inprogress = True, premature = False) - # files and directories that will be earmarked for removal after a - # successful job + logger.info('Starting to work on each configuration') + + logger.info(f'Working on config {config_path}') + + try: + # read the run config json, and replace any tokens with values from the sys_config (such at the + # directory root for the installation) + config: dict = parse_json_file_with_tokens(config_path, sys_config) + + # then add the sys_config keys and values to the configjson + for k, v in sys_config.items(): + if k not in config.keys(): + config[k] = v + else: + logger.warning(f"Key {k} already present in run config - not adding key with same name from " + f"the sys_config") + + """ + then add the args keys and values to the configjson, we will override any keys that are already + in the configjson + """ + for k, v in args.items(): + if k in config.keys(): + logger.warning(f"Key {k} already present in run config - overriding key with same name from " + f"the sys args") + config[k] = v + + except: + logger.exception(f"Failure in opening or checking config {config_path}") + # TODO: This case should test flagdir.jobStatus.__exit__() + raise # endJob('ERROR',premature=True) + + # provide specific case details to template config + + # from configtemplate create configFileName to describe the specific job + config_file_name = f"{os.path.basename(config_path).replace('.json', '')}_{component}" + + config['ConfigFilePath'] = config_file_name + + # write the complete configuration file to job directory + with open(f"{job_path}/{config_file_name}.json", 'w') as write_file: + json.dump(config, write_file, indent = 4) + + # proc_description = universal_config['ProcessInJob'] + proc_description = 'ProcessInJob' + try: + proc_out: dict = self.process_in_job(job_path, status, config, component) + except: + logger.exception(f"Error in process_in_job") + status.reset('ERROR') + end_job(status, premature = True) + + # Set default case + # This would be improved by implementing a class structure - not ethat the proc_out is not curently used + # as we now deal with copying outputs elsewhere. Keeping it here as a placeholder. + if proc_out is None: + proc_out = { + 'output': None, + 'clearup': None} + paths_to_clear = [] + if 'clearup' in proc_out.keys(): + append_item_to_list( + proc_out['clearup'], + paths_to_clear, + proc_description, + status) - logger.info('Starting to work on each configuration') + # Run EWS-plotting command + + proc_description = 'ProcessEWSPlotting' + try: + self.process_post_job(job_path, config) + except: + logger.exception(f"Error in {proc_description}()") + status.reset('ERROR') + end_job(status, premature = True) + + logger.info('Finished with EWS-Plotting, appending images to list for transfer') - for configIndex, configtemplate in enumerate(config_paths): - - config_paths_length: int = len(config_paths) - logger.info(f'Working on config {configIndex + 1} of {config_paths_length}') - - try: - config: dict = parse_json_file_with_tokens(configtemplate, sys_config) - # then add the sys_config keys and values to the configjson - for k, v in sys_config.items(): - if k not in config.keys(): - config[k] = v - else: - logger.warning(f"Key {k} already present in run config - not adding key with same name from " - f"the sys_config") - - except: - logger.exception(f"Failure in opening or checking config {configtemplate}") - # TODO: This case should test flagdir.jobStatus.__exit__() - raise # endJob('ERROR',premature=True) - - # provide specific case details to template config - - # from configtemplate create configFileName to describe the specific job - config_file_name = f"{os.path.basename(configtemplate).replace('.json', '')}_{component}" - - config['ConfigFilePath'] = config_file_name - - # write the complete configuration file to job directory - with open(f"{job_path}/{config_file_name}.json", 'w') as write_file: - json.dump(config, write_file, indent = 4) - - # proc_description = universal_config['ProcessInJob'] - proc_description = 'ProcessInJob' - try: - proc_out: dict = self.process_in_job(job_path, status, config, component) - except: - logger.exception(f"Error in process_in_job") - status.reset('ERROR') - end_job(status, premature = True) - - # Set default case - # This would be improved by implementing a class structure - not ethat the proc_out is not curently used - # as we now deal with copying outputs elsewhere. Keeping it here as a placeholder. - if proc_out is None: - proc_out = { - 'output': None, - 'clearup': None} - - if 'clearup' in proc_out.keys(): - append_item_to_list( - proc_out['clearup'], - paths_to_clear, - proc_description, - status) - - # Run EWS-plotting command - - proc_description = 'ProcessEWSPlotting' - try: - self.process_post_job(job_path, config) - except: - logger.exception(f"Error in {proc_description}()") - status.reset('ERROR') - end_job(status, premature = True) - - logger.info('Finished with EWS-Plotting, appending images to list for transfer') - - logger.info(f'Finished with config {configIndex + 1} of {config_paths_length}') + logger.info(f'Finished with config {config_path}') status.reset('SUCCESS') diff --git a/ews/coordinator/processor_epidemiology.py b/ews/coordinator/processor_epidemiology.py index 723bc3df240801dd598c5aacf65ad7231468a752..0a2aa23572611304dab5082164d59652a0fead11 100644 --- a/ews/coordinator/processor_epidemiology.py +++ b/ews/coordinator/processor_epidemiology.py @@ -52,29 +52,29 @@ class ProcessorEpidemiology(ProcessorBase): logger.info('started process_pre_job_epi()') - # check pre-requisite jobs are complete - # query_past_successes(input_args, 'Deposition') - # - # query_past_successes(input_args, 'Environment') + config_file: str = input_args['config_path'] - config_fns: List[str] = input_args['config_paths'] + # they should be working if the script made it this far, no need to try + config: dict = parse_json_file_with_tokens(config_file) - for configFile in config_fns: + # determine end time, from config file or the input args + arg_start_date: str = input_args['start_date'] + if 'CalculationSpanDays' in config: + calc_span_days = config['CalculationSpanDays'] + elif 'CalculationSpanDays' in input_args: + calc_span_days = input_args['CalculationSpanDays'] + else: + logger.error('CalculationSpanDays not defined in config or input_args') + return False - # they should be working if the script made it this far, no need to try - config_i = parse_json_file_with_tokens(configFile) + assert len(calc_span_days) == 2 - #determine end time, from config file - arg_start_date: str = input_args['start_date'] - calc_span_days = config_i['CalculationSpanDays'] - assert len(calc_span_days) == 2 + start_time, end_time = calc_epi_date_range(arg_start_date,calc_span_days) - start_time, end_time = calc_epi_date_range(arg_start_date,calc_span_days) - - # warn if it is a long timespan - date_diff = end_time - start_time - if date_diff.days > 100: - logger.warning("More than 100 days will be calculated over, likely longer than any single season") + # warn if it is a long timespan + date_diff = end_time - start_time + if date_diff.days > 100: + logger.warning("More than 100 days will be calculated over, likely longer than any single season") return True @@ -286,14 +286,14 @@ class ProcessorEpidemiology(ProcessorBase): # to end date, so choose an apporporiate lister, e.g. # list_onefile_operational - is_continue = config.get('continue',False) + is_continue = config.get('continue', False) # initialise any needed variables reference_date_str = config['StartString'] reference_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d') - start_date, end_date = calc_epi_date_range(reference_date_str,config['CalculationSpanDays']) + start_date, end_date = calc_epi_date_range(reference_date_str, config['CalculationSpanDays']) start_string = start_date.strftime('%Y-%m-%d-%H%M') start_string_short = start_date.strftime('%Y%m%d%H%M') @@ -327,7 +327,7 @@ class ProcessorEpidemiology(ProcessorBase): config['SubRegionName'] = region config['DiseaseName'] = disease - config_epi = config.copy() + # config_epi = config.copy() # TODO: CAUTION: Any iterations (e.g. disease or sub-region) are hidden # in jobPath, and not retained in the config file. This is a provlem for @@ -342,7 +342,7 @@ class ProcessorEpidemiology(ProcessorBase): logger.info(f"Preparing for epidemiology calc of {disease} in {region}") # create config_filename to describe job configuration - config_filename = self.create_epi_config_string(config,case_specific_path,start_string,end_string) + config_filename = self.create_epi_config_string(config, case_specific_path, start_string, end_string) # prepare a directory for input data jobDataPath = f"{case_specific_path}/input_data/" @@ -352,7 +352,7 @@ class ProcessorEpidemiology(ProcessorBase): # configure filename of prepared deposition data - if 'Deposition' in config_epi: + if 'Deposition' in config: # determine which variable name to load for this disease disease_idx = [i for i,j in enumerate(diseases) if j==disease][0] @@ -361,7 +361,6 @@ class ProcessorEpidemiology(ProcessorBase): try: prep.gather_dependent_models( - config_epi, config, variable_name, start_date, @@ -378,13 +377,12 @@ class ProcessorEpidemiology(ProcessorBase): # configure filename of prepared environment data - if 'Environment' in config_epi: + if 'Environment' in config: logger.info('Preparing environmental suitability data') try: prep.gather_dependent_models( - config_epi, config, variable_name, start_date, @@ -406,13 +404,13 @@ class ProcessorEpidemiology(ProcessorBase): logger.info('Preparing a copy of the host raster data') # TargetRaster defines the grid that the epi model works on. - assert 'TargetRaster' in config_epi['Host'] + assert 'TargetRaster' in config['Host'] # It should have been generated in advance by the user, by reprojecting # the available host map (e.g. MapSPAM) to the NAME output grid. # wheat_raster_reprojection.py is available to support this. - if 'HostRasters' in config_epi['Host']: + if 'HostRasters' in config['Host']: # HostRasters is a dictionary with date: filename entries describing # different host rasters valid at different times i.e. a simple # representation of dynamic host, so prepare a host file as is done @@ -420,7 +418,7 @@ class ProcessorEpidemiology(ProcessorBase): # All host maps should have the same spatial grid as the TargetRaster - rasters_dict = config_epi['Host']['HostRasters'] + rasters_dict = config['Host']['HostRasters'] dst_host_csv = f"{jobDataPath}/data_input_host.csv" @@ -429,13 +427,13 @@ class ProcessorEpidemiology(ProcessorBase): else: # There is a host raster applicable to all times, i.e. static host - src_host = config_epi['Host']['TargetRaster'] + src_host = config['Host']['TargetRaster'] fn_host = os.path.basename(src_host) dst_host = f"{jobDataPath}/{fn_host}" # copy the tif to the job directory and refer to that instead shutil.copyfile(src_host,dst_host) - config_epi['Host']['TargetRaster'] = dst_host + config['Host']['TargetRaster'] = dst_host logger.info('Preparing a copy of the host data as csv') @@ -445,15 +443,15 @@ class ProcessorEpidemiology(ProcessorBase): {'201001010000': dst_host}, dst_host_csv) - config_epi['Host']['HostCSV'] = dst_host_csv - config_epi['Host']['FileNamePrepared'] = dst_host_csv + config['Host']['HostCSV'] = dst_host_csv + config['Host']['FileNamePrepared'] = dst_host_csv # Preparing any continue-run files if is_continue is True: logger.debug('This is a continue run.') - for ci in config_epi['Epi']: + for ci in config['Epi']: model_name = ci['model'] @@ -463,7 +461,6 @@ class ProcessorEpidemiology(ProcessorBase): try: prep.gather_dependent_models( - config_epi, config, variable_name, start_date, @@ -482,12 +479,12 @@ class ProcessorEpidemiology(ProcessorBase): # provide fundamental config elements to config_epi for k,v in config.items(): if k not in short_name.keys(): - config_epi[k]=v + config[k]=v if is_continue is True: # This will be used in the epi model and not the data preparation continue_start_date = reference_date+datetime.timedelta(hours=3) - config_epi['ContinueStartTime'] = continue_start_date.strftime('%Y-%m-%d-%H%M') + config['ContinueStartTime'] = continue_start_date.strftime('%Y-%m-%d-%H%M') def print_item(item): logger.debug(f"Item {item}") @@ -503,16 +500,16 @@ class ProcessorEpidemiology(ProcessorBase): logger.debug('Incremental configuration looks like:') - iterate(config_epi) + iterate(config) logger.debug('Complete configuration looks like:') - logger.debug(json.dumps(config_epi,indent=2)) + logger.debug(json.dumps(config, indent=2)) # write the complete configuration file to job directory run_config_fn = f"{case_specific_path}/{config_filename}.json" with open(run_config_fn,'w') as write_file: - json.dump(config_epi,write_file,indent=4) + json.dump(config, write_file, indent=4) # run epi model @@ -535,7 +532,7 @@ class ProcessorEpidemiology(ProcessorBase): def calc_mean(arr): return 'mean', arr.mean() - for epiconf in config_epi['Epi']: + for epiconf in config['Epi']: outfile = epiconf["infectionRasterFileName"] @@ -565,7 +562,7 @@ class ProcessorEpidemiology(ProcessorBase): figure_func = getattr(EpiAnalysis, 'plot_compare_epi_cases') # isolate the config for this function, in case of modifications - config_epi_for_comparison = config_epi.copy() + config_epi_for_comparison = config.copy() fig,axes,cases = figure_func( config_epi_for_comparison, @@ -578,7 +575,7 @@ class ProcessorEpidemiology(ProcessorBase): # slice the epi results into before forecast and in forecast - for epiconf in config_epi['Epi']: + for epiconf in config['Epi']: unit_description = '' if epiconf.get('rescale_output_by_host_raster',False) is True: @@ -587,12 +584,12 @@ class ProcessorEpidemiology(ProcessorBase): # load the full epi results df_full = read_csv(outfile,header=[0],index_col=[0,1]) - column_date_fmt = f"{config_epi['StartTimeShort']}_%Y%m%d%H%M" + column_date_fmt = f"{config['StartTimeShort']}_%Y%m%d%H%M" df_full_dates = to_datetime(df_full.columns.astype('str'),format=column_date_fmt) # determine date to cut with # plus 1 minute so midnight is associated with preceding day - date_to_cut = datetime.datetime.strptime(config_epi['StartString']+'0001','%Y%m%d%H%M') + date_to_cut = datetime.datetime.strptime(config['StartString'] + '0001', '%Y%m%d%H%M') dates_after_cut = df_full_dates >= date_to_cut idx = argmax(dates_after_cut)-1 diff --git a/tests/integration/partial/integration_test_utils.py b/tests/integration/partial/integration_test_utils.py index 54e48dde6ba9d1537c471be4da184902ad888057..5b4b000c3e3258143f270d753fa2f97e3d9ea315 100644 --- a/tests/integration/partial/integration_test_utils.py +++ b/tests/integration/partial/integration_test_utils.py @@ -218,7 +218,7 @@ class IntegrationTestUtils: args_dict: dict = {} sys_config_path = IntegrationTestUtils.RUN_SYS_CONFIG_FILE_PATH - config_paths = [IntegrationTestUtils.RUN_CONFIG_FILE_PATH] + config_path = IntegrationTestUtils.RUN_CONFIG_FILE_PATH # note, possible to override these values in the kwargs loop below args_dict['live'] = False @@ -227,7 +227,7 @@ class IntegrationTestUtils: args_dict['component'] = component args_dict['short_name'] = shortname args_dict['sys_config_path'] = sys_config_path - args_dict['config_paths'] = config_paths + args_dict['config_path'] = config_path args_dict['log_level'] = 'info' args_dict['clearup'] = True @@ -260,7 +260,7 @@ class IntegrationTestUtils: args_dict['component'] = component args_dict['short_name'] = short_name args_dict['sys_config_path'] = IntegrationTestUtils.RUN_SYS_CONFIG_FILE_PATH - args_dict['config_paths'] = [IntegrationTestUtils.RUN_CONFIG_FILE_PATH] + args_dict['config_path'] = IntegrationTestUtils.RUN_CONFIG_FILE_PATH args_dict['log_level'] = 'info' args_dict['clearup'] = True