From b170213309196d53f6e5dd38daa04f10dee62374 Mon Sep 17 00:00:00 2001 From: jws52 <jws52@cam.ac.uk> Date: Mon, 4 Sep 2023 15:21:21 +0100 Subject: [PATCH] refactor: Utilise epi updater to speed up runs See EpiModel.EpiPrep.updater.update_input() docstring for comments. This should substantially speed up production epi runs, particularly towards the end of the season. This commit also unifies deposition and env suit preparation code. --- coordinator/ProcessorEpidemiology.py | 141 +++++++++++++++++---------- 1 file changed, 91 insertions(+), 50 deletions(-) diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index 7dad3c3..7b1cb2d 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -21,7 +21,7 @@ from EpiModel import ( # created by rs481 EpiModel, plotRaster ) -from EpiModel.EpiPrep import lister, loader, prep +from EpiModel.EpiPrep import lister, loader, prep, updater from ews_postprocessing.epi.epi_post_processor import EPIPostPostProcessor from ProcessorUtils import ( @@ -257,20 +257,28 @@ def process_in_job_epi(jobPath,status,config,component): config['StartTimeShort'] = start_string_short config['EndTime'] = end_string + yesterday_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d') - datetime.timedelta(days=1) + yesterday_string = yesterday_date.strftime('%Y%m%d') + diseases = config['Epidemiology']['DiseaseNames'] - def gather_deposition(config_epi,config,variable_name,start_date,end_date,jobDataPath,status): + def gather_dependent_models(config_epi,config,variable_name,start_date,reference_date,end_date,jobDataPath,lastjobDataPath,status,component='Deposition'): + + # This function is only prepared for components in this list + assert component in ['Deposition','Environment'] + + # TODO: Simplify the set of required arguments. Check if config is necessary. - # TODO: Simplify the set of required arguments . Check if config is necessary. + config_epi[component]['VariableName'] = variable_name # disease_latin_name_dict[disease]+'_DEPOSITION' - config_epi['Deposition']['VariableName'] = variable_name # disease_latin_name_dict[disease]+'_DEPOSITION' + config_epi[component]['FileNamePrepared'] = f"{jobDataPath}/data_input_{component.lower()}.csv" - config_epi['Deposition']['FileNamePrepared'] = f"{jobDataPath}/data_input_deposition.csv" + config_epi[component]['LastFileNamePrepared'] = f"{lastjobDataPath}/data_input_{component.lower()}.csv" # Use config-defined file lister - file_lister_dep_name = config_epi['Deposition']['FileListerFunction'] + file_lister_name = config_epi[component]['FileListerFunction'] - file_lister_dep = getattr(lister,file_lister_dep_name) + file_lister_func = getattr(lister,file_lister_name) config_for_lister = config.copy() config_for_lister.update(config_epi) @@ -279,25 +287,60 @@ def process_in_job_epi(jobPath,status,config,component): lister_kwargs['reference_date']=config['ReferenceTime'] loader_kwargs= {} - loader_kwargs['VariableName']= config_for_lister['Deposition'].get('VariableName') - loader_kwargs['VariableNameAlternative']= config_for_lister['Deposition'].get('VariableNameAlternative') - try: + loader_dict = { + 'Deposition' : loader.load_NAME_file, + 'Environment' : loader.load_env_file, + } + + loader_func = loader_dict[component] - prep.prep_input(config_for_lister,start_date,end_date, - component='Deposition', - file_lister=file_lister_dep, - file_loader=loader.load_NAME_file, + # Provide component-specific variables + if component == 'Deposition': + + loader_kwargs['VariableName']= config_for_lister[component].get('VariableName') + loader_kwargs['VariableNameAlternative']= config_for_lister[component].get('VariableNameAlternative') + + try: + # Make use of data prepared yesterday + updater.update_input( + config_for_lister, + reference_date, + end_date, + component=component, + file_lister=file_lister_func, + file_loader=loader_func, lister_kwargs=lister_kwargs, + update_period_days=3, **loader_kwargs) - assert os.path.isfile(config_epi['Deposition']['FileNamePrepared']) + assert os.path.isfile(config_epi[component]['FileNamePrepared']) - except: + except AssertionError: + + logger.exception(f"Unexpected error in {component} data preparation (updater)") - logger.exception(f"Unexpected error in deposition data preparation") - status.reset('ERROR') - endJob(status,premature=True) + # Performa a fresh load of the full time series + + try: + + prep.prep_input( + config_for_lister, + start_date, + end_date, + component=component, + file_lister=file_lister_func, + file_loader=loader_func, + lister_kwargs=lister_kwargs, + **loader_kwargs) + + assert os.path.isfile(config_epi[component]['FileNamePrepared']) + + except: + + logger.exception(f"Unexpected error in {component} data preparation (full load)") + status.reset('ERROR') + endJob(status,premature=True) return @@ -314,6 +357,9 @@ def process_in_job_epi(jobPath,status,config,component): assert disease in disease_latin_name_dict.keys() + config['SubRegionName'] = region + config['DiseaseName'] = disease + config_epi = config['Epidemiology'].copy() # TODO: CAUTION: Any iterations (e.g. disease or sub-region) are hidden @@ -335,6 +381,8 @@ def process_in_job_epi(jobPath,status,config,component): jobDataPath = f"{case_specific_path}/input_data/" Path(jobDataPath).mkdir(parents=True, exist_ok=True) + lastjobDataPath = jobDataPath.replace(f"EPI_{reference_date_str}",f"EPI_{yesterday_string}") + # configure filename of prepared deposition data if 'Deposition' in config_epi: @@ -344,43 +392,36 @@ def process_in_job_epi(jobPath,status,config,component): variable_name = depo_variable_names[disease_idx] - gather_deposition(config_epi,config,variable_name,start_date,end_date,jobDataPath,status) - + gather_dependent_models( + config_epi, + config, + variable_name, + start_date, + reference_date, + end_date, + jobDataPath, + lastjobDataPath, + status, + component='Deposition') + # configure filename of prepared deposition data if 'Environment' in config_epi: logger.info('Preparing environmental suitability data') - config_epi['SubRegionName'] = region - - config_epi['DiseaseName'] = disease - - config_epi['Environment']['FileNamePrepared'] = f"{jobDataPath}/data_input_environment.csv" - - # Use config-defined file lister - file_lister_env_name = config_epi['Environment']['FileListerFunction'] - - file_lister_env = getattr(lister,file_lister_env_name) - - config_for_lister = config.copy() - config_for_lister.update(config_epi) - - try: - - prep.prep_input(config_for_lister,start_date,end_date, - component='Environment', - file_loader=loader.load_env_file, - file_lister=file_lister_env) - - assert os.path.isfile(config_epi['Environment']['FileNamePrepared']) - - except: - - logger.exception(f"Unexpected error in env data preparation") - status.reset('ERROR') - endJob(status,premature=True) - + gather_dependent_models( + config_epi, + config, + variable_name, + start_date, + reference_date, + end_date, + jobDataPath, + lastjobDataPath, + status, + component='Environment') + # prepare a copy of the host data logger.info('Preparing a copy of the host raster data') -- GitLab