diff --git a/coordinator/ProcessorAdvisory.py b/coordinator/ProcessorAdvisory.py index a5260965fe267aacaf65ecf5a1860f4d23783398..4d78e795b83c52a2c5fda0d1c54375f15b9c7305 100644 --- a/coordinator/ProcessorAdvisory.py +++ b/coordinator/ProcessorAdvisory.py @@ -6,43 +6,62 @@ import logging # gitlab projects # TODO: Package these projects so they are robust for importing from AdvisoryBuilder import DataGatherer # created by jws52 +from Processor import Processor from ProcessorUtils import add_filters_to_sublogger, short_name -logger = logging.getLogger('Processor.Advisory') -add_filters_to_sublogger(logger) -def process_in_job_advisory(jobPath,status,config,component): - '''Generates a word processor file containing some basic survey statistics - and output figures from deposition, environmental suitability, and - eventually also the epi model. This template advisory is intended to speed - up the process of writing advisories. The intended user is a local expert - who edits the content of the document. - Uses the gitlab project EWS-advisory-builder.''' +class ProcessorAdvisory(Processor): - config_advisory = config[component].copy() + def process_pre_job(self, args): + return True - config_advisory['jobPath'] = jobPath - # provide top-level arguments to advisory config - for k,v in config.items(): - if k not in short_name.keys(): - config_advisory[k]=v + def process_in_job(self, jobPath, status, configjson, component) -> object: + self.process_in_job_advisory(jobPath, status, configjson, component) - dateString = config['StartString'] - layout = 'tight' + def process_post_job(self, jobPath, configjson): + pass - logging.info(f"Running for scenario {config_advisory}, {dateString}, {layout}") - report_names = DataGatherer.run_each_subregion(config_advisory, dateString, layout) + def __init__(self) -> None: + super().__init__() + logger = logging.getLogger('Processor.Advisory') + add_filters_to_sublogger(logger) - # pass the report filenames to upload to the remote server - proc_out = {} - # Output files available for upload - proc_out['output'] = report_names - # Processing files available for clearing - proc_out['clearup'] = None + def process_in_job_advisory(self, jobPath, status, config, component): + '''Generates a word processor file containing some basic survey statistics + and output figures from deposition, environmental suitability, and + eventually also the epi model. This template advisory is intended to speed + up the process of writing advisories. The intended user is a local expert + who edits the content of the document. + Uses the gitlab project EWS-advisory-builder.''' - return proc_out + config_advisory = config[component].copy() + + config_advisory['jobPath'] = jobPath + + # provide top-level arguments to advisory config + for k,v in config.items(): + if k not in short_name.keys(): + config_advisory[k]=v + + dateString = config['StartString'] + + layout = 'tight' + + logging.info(f"Running for scenario {config_advisory}, {dateString}, {layout}") + + report_names = DataGatherer.run_each_subregion(config_advisory, dateString, layout) + + # pass the report filenames to upload to the remote server + + proc_out = {} + # Output files available for upload + proc_out['output'] = report_names + # Processing files available for clearing + proc_out['clearup'] = None + + return proc_out diff --git a/coordinator/ProcessorComponents.py b/coordinator/ProcessorComponents.py index bea3f117a4e97b805c744cec4fd819fb3a37704d..4baf947e7f4da4b7bfaad4f40a522f11de03e69b 100644 --- a/coordinator/ProcessorComponents.py +++ b/coordinator/ProcessorComponents.py @@ -11,24 +11,24 @@ from typing import List # All of the process_* functions are callable from config files for the three # coordinator stages: pre, in (during) and plotting. -from ProcessorAdvisory import ( - process_in_job_advisory -) +# from ProcessorAdvisory import ( +# process_in_job_advisory +# ) #from ProcessorDeposition import ( # process_in_job_dep, # process_EWS_plotting_dep #) -from ProcessorEnvironment import ( - process_in_job_env2_0, - process_copy_past_job_env2_0, - process_EWS_plotting_env2_0 -) -from ProcessorEpidemiology import ( - process_pre_job_epi, - process_in_job_epi, - process_EWS_plotting_epi, -) +#from ProcessorEnvironment import ( +# process_in_job_env2_0, +# process_copy_past_job_env2_0, +# process_EWS_plotting_env2_0 +#) +# from ProcessorEpidemiology import ( +# process_pre_job_epi, +# process_in_job_epi, +# process_EWS_plotting_epi, +# ) from ProcessorScraper import ( process_in_job_media_scraper, ) @@ -36,11 +36,11 @@ from ProcessorServer import ( process_pre_job_server_download, upload ) -from ProcessorSurveys import ( - process_pre_job_survey, - process_in_job_survey, - process_EWS_plotting_survey -) +# from ProcessorSurveys import ( +# process_pre_job_survey, +# process_in_job_survey, +# process_EWS_plotting_survey +# ) from ProcessorUtils import ( add_filters_to_sublogger, query_past_successes diff --git a/coordinator/ProcessorEnvironment.py b/coordinator/ProcessorEnvironment.py index f356f7ba25d05791c2203d58f773f4bc854b8f25..d76205ed8a3970aa5b240cee8b68f530f2bc3f5b 100644 --- a/coordinator/ProcessorEnvironment.py +++ b/coordinator/ProcessorEnvironment.py @@ -12,211 +12,228 @@ import tarfile import iris from iris.cube import CubeList +from Processor import Processor +from ProcessorServer import process_pre_job_server_download from ews_postprocessing.environmental_suitability.env_suit_post_processor import EnvSuitPostProcessor from ews_postprocessing.utils.disease_info import EnvSuitDiseaseInfo import EnvSuitPipeline as esp from ProcessorUtils import ( - get_only_existing_globs, - subprocess_and_log, - add_filters_to_sublogger, - remove_path_from_tar_members, - short_name + get_only_existing_globs, + subprocess_and_log, + remove_path_from_tar_members, + short_name, add_filters_to_sublogger ) -logger = logging.getLogger('Processor.Environment') -add_filters_to_sublogger(logger) - -def process_in_job_env2_0(jobPath,status,config,component): - '''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.''' - - logger.info('started process_in_job_env2_0()') - - logger.info('Copying file from remote server to job directory') - - file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) - file_name = Template(config[component]['InputFileTemplate']).substitute(**config) - - #TODO: check if file exists already (may be the case for multiple configs in one) - - # TODO: perform ssh file transfer in python instead of subprocess - server_name: str = config['ServerName'] - if server_name == "": - cmd_scp: list = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath] - else: - cmd_scp: list = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", - f"{config['ServerName']}:{file_path}/{file_name}.tar.gz", jobPath] - - description_short = 'env2 scp' - description_long = 'Copying file from remote server to job directory' - # lawrence comment in/out - subprocess_and_log(cmd_scp,description_short, description_long) - - logger.info('untarring the input file') - - # untar incoming name data - output_directory = f"{jobPath}/NAME_Met_as_netcdf" - Path(output_directory).mkdir(parents=True, exist_ok=True) - tarfile_name = f"{jobPath}/{file_name}.tar.gz" - with tarfile.open(tarfile_name) as tar: - members = remove_path_from_tar_members(tar) - tar.extractall(output_directory, members = members) - - # basic check that contents are as expected for 7-day forecast (57 timepoints in all files) - cube_wildcard = f"{output_directory}/*.nc" - cubes: CubeList = iris.load(cube_wildcard) - - # land_fraction and topography will only have a single timepoint (as these dont change over time), so we can ignore - # these when sense-checking the expected number of timepoints - ignore_list = ["LAND_FRACTION", "TOPOGRAPHY"] - - for cube in cubes: - var_name = cube.name() - coord = cube.coord("time") - timepoint_count = coord.shape[0] - if timepoint_count != 57 and var_name not in ignore_list: - msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}" - logger.error(msg) - raise RuntimeError(msg) - - region = config['RegionName'] - - logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear") - - pipeline_config = config["Environment"] - try: - #todo lawrence comment this back to original (extracted=False) - esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False) - except: - logger.exception(f"Some failure when running EnvSuitPipeline.py") - raise - - logger.info('Finished running environmental suitability 2.0') - - # TODO: Check that the output appears as expected +class ProcessorEnvironment(Processor): - proc_out = {} - # Output files available for upload - proc_out['output'] = None - # Processing files available for clearing - proc_out['clearup'] = [f"{jobPath}/{file_name}.tar.gz"] - - return proc_out - -def process_copy_past_job_env2_0(jobPath,status,config,component): - '''For when we want to skip process_in_job() to test the other components of - this script. Currently hard-wired.''' - - # TODO: remove this hard-wired assumption - jobPath_to_copy = f"{jobPath}/../{short_name['Environment']}_{config['StartString']}_bak/" - - assert os.path.exists(jobPath_to_copy) - - dir_src = f"{jobPath_to_copy}/processed/" - - dir_dst = f"{jobPath}/processed/" - - logger.info(f"Copying from {dir_src}") - - logger.info(f"to {dir_dst}") + def process_pre_job(self, args): + return process_pre_job_server_download(args) - copy_tree(dir_src,dir_dst) - logger.info('Copying complete') + def process_in_job(self, jobPath, status, configjson, component) -> object: + return self.process_in_job_env2_0(jobPath, status, configjson, component) - proc_out = {} - # Output files available for upload - proc_out['output'] = None - # Processing files available for clearing - proc_out['clearup'] = None - return proc_out + def process_post_job(self, jobPath, configjson): + return self.process_EWS_plotting_env2_0(jobPath, configjson) -'''class EWSPlottingEnvSuit(EWSPlottingEnvSuitBase): - def set_custom_params(self, - sys_params_dict: dict, - chart_params_dict: dict, - run_params_dict: dict, - disease_csv_template_arg: str, - diseases: List[EnvSuitDiseaseInfo]): - # this is unique to the asia/east africa env suit, as we are not filtering within country boundaries - run_params_dict[RUN_PARAMS.FILTER_FOR_COUNTRY_KEY] = "False"''' - -#TODO test if this works -def process_EWS_plotting_env2_0(jobPath,config): - '''Configures the plotting arguments and calls EWS-plotting as a python module. - Returns a list of output files for transfer.''' - - logger.info('started process_EWS_plotting_env2_0()') - - main_region = config['RegionName'] - - input_dir = f"{jobPath}/processed/{main_region}" - - subregions = config['SubRegionNames'] - - EWSPlottingOutputGlobs = [] - - # work on each region - for region in subregions: - - output_dir = f"{jobPath}/plotting/{region.lower()}" - csv_template_dir = input_dir + "/{DISEASE_DIR}/RIE_value.csv" - - Path(output_dir).mkdir(parents=True, exist_ok=True) - - sys_config = config['Environment']['EWS-Plotting']['SysConfig'] - run_config = config['Environment']['EWS-Plotting']['RunConfig'] - chart_config = config['Environment']['EWS-Plotting'][region]['ChartConfig'] - filter_for_country = config['Environment']['EWS-Plotting'][region]['FilterForCountry'] - - # Note that this runs all disease types available - - logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") - - env_suit_processor = EnvSuitPostProcessor() - env_suit_processor.set_param_config_files(sys_params_file_arg = sys_config, - chart_params_file_arg = chart_config, - run_params_file_arg = run_config, - es_output_dir_arg = output_dir, - issue_date_arg = config['StartString'], - disease_csv_template_arg = csv_template_dir) - - env_suit_processor.run_params.FILTER_FOR_COUNTRY = (filter_for_country.upper() == "TRUE") - - # Include further diseases in plotting. In this case the irrigated suitabilite for the rusts. - # TODO: move this part out into a config - extra_diseases = [ - EnvSuitDiseaseInfo("Stem rust temp-only", "stem_rust_temponly", config['StartString'], "StemRust_TempOnly", csv_template_dir), - EnvSuitDiseaseInfo("Leaf rust temp-only", "leaf_rust_temponly", config['StartString'], "LeafRust_TempOnly", csv_template_dir), - EnvSuitDiseaseInfo("Stripe rust temp-only", "stripe_temponly", config['StartString'], "StripeRust_TempOnly", csv_template_dir), - ] - - env_suit_processor.add_diseases(diseases=extra_diseases) - - env_suit_processor.process() + def __init__(self) -> None: + super().__init__() + logger = logging.getLogger('Processor.Environment') + add_filters_to_sublogger(logger) + def process_in_job_env2_0(self, jobPath,status,config,component): + '''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.''' + + self.logger.info('started process_in_job_env2_0()') + + self.logger.info('Copying file from remote server to job directory') + + file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) + file_name = Template(config[component]['InputFileTemplate']).substitute(**config) + + #TODO: check if file exists already (may be the case for multiple configs in one) + + # TODO: perform ssh file transfer in python instead of subprocess + server_name: str = config['ServerName'] + if server_name == "": + cmd_scp: list = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath] + else: + cmd_scp: list = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", + f"{config['ServerName']}:{file_path}/{file_name}.tar.gz", jobPath] + + description_short = 'env2 scp' + description_long = 'Copying file from remote server to job directory' + # lawrence comment in/out + subprocess_and_log(cmd_scp,description_short, description_long) + + self.logger.info('untarring the input file') + + # untar incoming name data + output_directory = f"{jobPath}/NAME_Met_as_netcdf" + Path(output_directory).mkdir(parents=True, exist_ok=True) + tarfile_name = f"{jobPath}/{file_name}.tar.gz" + with tarfile.open(tarfile_name) as tar: + members = remove_path_from_tar_members(tar) + tar.extractall(output_directory, members = members) + + # basic check that contents are as expected for 7-day forecast (57 timepoints in all files) + cube_wildcard = f"{output_directory}/*.nc" + cubes: CubeList = iris.load(cube_wildcard) + + # land_fraction and topography will only have a single timepoint (as these dont change over time), so we can ignore + # these when sense-checking the expected number of timepoints + ignore_list = ["LAND_FRACTION", "TOPOGRAPHY"] + + for cube in cubes: + var_name = cube.name() + coord = cube.coord("time") + timepoint_count = coord.shape[0] + if timepoint_count != 57 and var_name not in ignore_list: + msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}" + self.logger.error(msg) + raise RuntimeError(msg) + + region = config['RegionName'] + + self.logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear") + + pipeline_config = config["Environment"] + try: + #todo lawrence comment this back to original (extracted=False) + esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False) + except: + self.logger.exception(f"Some failure when running EnvSuitPipeline.py") + raise + + self.logger.info('Finished running environmental suitability 2.0') + + # TODO: Check that the output appears as expected + + proc_out = {} + # Output files available for upload + proc_out['output'] = None + # Processing files available for clearing + proc_out['clearup'] = [f"{jobPath}/{file_name}.tar.gz"] + + return proc_out + + def process_copy_past_job_env2_0(self, jobPath,status,config,component): + '''For when we want to skip process_in_job() to test the other components of + this script. Currently hard-wired.''' + + # TODO: remove this hard-wired assumption + jobPath_to_copy = f"{jobPath}/../{short_name['Environment']}_{config['StartString']}_bak/" + + assert os.path.exists(jobPath_to_copy) + + dir_src = f"{jobPath_to_copy}/processed/" + + dir_dst = f"{jobPath}/processed/" + + self.logger.info(f"Copying from {dir_src}") + + self.logger.info(f"to {dir_dst}") + + copy_tree(dir_src,dir_dst) + + self.logger.info('Copying complete') + + proc_out = {} + # Output files available for upload + proc_out['output'] = None + # Processing files available for clearing + proc_out['clearup'] = None + + return proc_out + + '''class EWSPlottingEnvSuit(EWSPlottingEnvSuitBase): + + def set_custom_params(self, + sys_params_dict: dict, + chart_params_dict: dict, + run_params_dict: dict, + disease_csv_template_arg: str, + diseases: List[EnvSuitDiseaseInfo]): + # this is unique to the asia/east africa env suit, as we are not filtering within country boundaries + run_params_dict[RUN_PARAMS.FILTER_FOR_COUNTRY_KEY] = "False"''' + + #TODO test if this works + def process_EWS_plotting_env2_0(self, jobPath,config): + '''Configures the plotting arguments and calls EWS-plotting as a python module. + Returns a list of output files for transfer.''' + + self.logger.info('started process_EWS_plotting_env2_0()') + + main_region = config['RegionName'] + + input_dir = f"{jobPath}/processed/{main_region}" + + subregions = config['SubRegionNames'] + + EWSPlottingOutputGlobs = [] + + # work on each region + for region in subregions: + + output_dir = f"{jobPath}/plotting/{region.lower()}" + csv_template_dir = input_dir + "/{DISEASE_DIR}/RIE_value.csv" + + Path(output_dir).mkdir(parents=True, exist_ok=True) + + sys_config = config['Environment']['EWS-Plotting']['SysConfig'] + run_config = config['Environment']['EWS-Plotting']['RunConfig'] + chart_config = config['Environment']['EWS-Plotting'][region]['ChartConfig'] + filter_for_country = config['Environment']['EWS-Plotting'][region]['FilterForCountry'] + + # Note that this runs all disease types available + + self.logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") + + env_suit_processor = EnvSuitPostProcessor() + env_suit_processor.set_param_config_files(sys_params_file_arg = sys_config, + chart_params_file_arg = chart_config, + run_params_file_arg = run_config, + es_output_dir_arg = output_dir, + issue_date_arg = config['StartString'], + disease_csv_template_arg = csv_template_dir) + + env_suit_processor.run_params.FILTER_FOR_COUNTRY = (filter_for_country.upper() == "TRUE") + + # Include further diseases in plotting. In this case the irrigated suitabilite for the rusts. + # TODO: move this part out into a config + extra_diseases = [ + EnvSuitDiseaseInfo("Stem rust temp-only", "stem_rust_temponly", config['StartString'], "StemRust_TempOnly", csv_template_dir), + EnvSuitDiseaseInfo("Leaf rust temp-only", "leaf_rust_temponly", config['StartString'], "LeafRust_TempOnly", csv_template_dir), + EnvSuitDiseaseInfo("Stripe rust temp-only", "stripe_temponly", config['StartString'], "StripeRust_TempOnly", csv_template_dir), + ] + + env_suit_processor.add_diseases(diseases=extra_diseases) + + env_suit_processor.process() + + # check the output + EWSPlottingOutputDir = f"{output_dir}/images/" + #EWSPlottingOutputGlobs += [ + # # daily plots + # f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png", + # # weekly plots + # f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"] + + EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"] + # check the output - EWSPlottingOutputDir = f"{output_dir}/images/" - #EWSPlottingOutputGlobs += [ - # # daily plots - # f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png", - # # weekly plots - # f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"] - - EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"] - - # check the output - EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) - - # check there is some output from EWS-plotting - if not EWSPlottingOutputGlobs: - logger.error('EWS-Plotting did not produce any output') - raise RuntimeError - - # provide list for transfer - EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)]) - - return EWSPlottingOutputs + EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) + + # check there is some output from EWS-plotting + if not EWSPlottingOutputGlobs: + self.logger.error('EWS-Plotting did not produce any output') + raise RuntimeError + + # provide list for transfer + EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)]) + + return EWSPlottingOutputs diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index 23f193b8bdec60a2e096e6355425bec507cdc381..d80f827d8adc5969be8ec60299acd48cf2c4f38f 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -22,6 +22,7 @@ from EpiModel import ( # created by rs481 plotRaster ) from EpiModel.EpiPrep import lister, loader, prep, updater +from Processor import Processor from ews_postprocessing.epi.epi_post_processor import EPIPostPostProcessor from ProcessorUtils import ( @@ -34,781 +35,799 @@ from ProcessorUtils import ( disease_latin_name_dict ) -logger = logging.getLogger('Processor.Epi') -add_filters_to_sublogger(logger) +class ProcessorEpidemiology(Processor): + def process_pre_job(self, args): + return self.process_pre_job_epi(args) -def calc_epi_date_range(init_str,span_days=[0,6]): - '''Date range is determined relative to init_date. - span_days is usually defined in the job config file. Day zero is current - day, negative values point to past (historical or analysis) days, and - positive values point to forecast days. - Returns a start_date and end_date.''' - init_date = datetime.datetime.strptime(init_str,'%Y%m%d') + def process_in_job(self, jobPath, status, configjson, component) -> object: + return self.process_in_job_epi(jobPath, status, configjson, component) - # note that filename date represents preceding 3 hours, so day's data - # starts at file timestamp 0300 UTC - threehour_shift = datetime.timedelta(hours=3) - # add 24hrs so that final day is fully included - day_shift = datetime.timedelta(days=1) + def process_post_job(self, jobPath, configjson): + return self.process_EWS_plotting_epi(jobPath, configjson) - # if more than 999 days - if len(str(span_days[0]))>3: - # assume it is a date string - start_date = datetime.datetime.strptime(span_days[0]+'0300','%Y%m%d%H%M') - else: - date_shift0 = datetime.timedelta(days=span_days[0]) - start_date = init_date + date_shift0 + threehour_shift + def __init__(self) -> None: + super().__init__() + logger = logging.getLogger('Processor.Epi') + add_filters_to_sublogger(logger) - if len(str(span_days[1]))>3: - # assume it is a date string - end_date = datetime.strptime(span_days[1]+'0000','%Y%m%d%H%M') - end_date = end_date + day_shift - else: - date_shift1 = datetime.timedelta(days=span_days[1]) + def calc_epi_date_range(self, init_str, span_days = [0, 6]): + '''Date range is determined relative to init_date. + span_days is usually defined in the job config file. Day zero is current + day, negative values point to past (historical or analysis) days, and + positive values point to forecast days. + Returns a start_date and end_date.''' - end_date = init_date + date_shift1 +day_shift + init_date = datetime.datetime.strptime(init_str,'%Y%m%d') - return start_date, end_date + # note that filename date represents preceding 3 hours, so day's data + # starts at file timestamp 0300 UTC + threehour_shift = datetime.timedelta(hours=3) -def process_pre_job_epi(input_args: dict): - '''Returns a boolean as to whether the job is ready for full processing.''' + # add 24hrs so that final day is fully included + day_shift = datetime.timedelta(days=1) - logger.info('started process_pre_job_epi()') + # if more than 999 days + if len(str(span_days[0]))>3: + # assume it is a date string + start_date = datetime.datetime.strptime(self.span_days[0]+'0300','%Y%m%d%H%M') + else: + date_shift0 = datetime.timedelta(days=span_days[0]) - # check pre-requisite jobs are complete - query_past_successes(input_args) + start_date = init_date + date_shift0 + threehour_shift - config_fns: List[str] = input_args['config_paths'] + if len(str(span_days[1]))>3: + # assume it is a date string + end_date = datetime.strptime(span_days[1]+'0000','%Y%m%d%H%M') - for configFile in config_fns: + end_date = end_date + day_shift + else: + date_shift1 = datetime.timedelta(days=span_days[1]) - # they should be working if the script made it this far, no need to try - config_i = open_and_check_config(configFile) + end_date = init_date + date_shift1 +day_shift - #determine end time, from config file - arg_start_date: str = input_args['start_date'] - calc_span_days = config_i['Epidemiology']['CalculationSpanDays'] - assert len(calc_span_days) == 2 + return start_date, end_date - start_time, end_time = calc_epi_date_range(arg_start_date,calc_span_days) + def process_pre_job_epi(self, input_args: dict): + '''Returns a boolean as to whether the job is ready for full processing.''' - # warn if it is a long timespan - date_diff = end_time - start_time - if date_diff.days > 100: - logger.warning("More than 100 days will be calculated over, likely longer than any single season") + self.logger.info('started process_pre_job_epi()') - return True + # check pre-requisite jobs are complete + query_past_successes(input_args) + config_fns: List[str] = input_args['config_paths'] -def create_epi_config_string(config,jobPath,startString,endString): + for configFile in config_fns: - configtemplate_fn = config['ConfigFilePath'] - configName_withoutEpi = f"{os.path.basename(configtemplate_fn).replace('.json','')}_{startString}-{endString}" + # they should be working if the script made it this far, no need to try + config_i = open_and_check_config(configFile) - # create a string describing every epi calc configuration - epiStrings = [] - for epiconf in config['Epidemiology']['Epi']: - epiKwargsString = ''.join([f"{k}{v}" for k,v in epiconf['modelArguments'].items()]) + #determine end time, from config file + arg_start_date: str = input_args['start_date'] + calc_span_days = config_i['Epidemiology']['CalculationSpanDays'] + assert len(calc_span_days) == 2 - # drop any repetitive elements of kwarg - epiKwargsString = epiKwargsString.replace('infectionprevious','') - epiKwargsString = epiKwargsString.replace('capbeta','cb') + start_time, end_time = self.calc_epi_date_range(arg_start_date,calc_span_days) - epiCaseString = f"{epiconf['model'].lower()}{epiKwargsString}" + # warn if it is a long timespan + date_diff = end_time - start_time + if date_diff.days > 100: + self.logger.warning("More than 100 days will be calculated over, likely longer than any single season") - # provide to configuration for output filename - epiconf["infectionRasterFileName"] = f"{jobPath}/infections_{configName_withoutEpi}_{epiCaseString}" + return True - epiStrings += [epiCaseString] - epiString = '-'.join(epiStrings) + def create_epi_config_string(self, config,jobPath,startString,endString): - config_filename = f"{configName_withoutEpi}_{epiString}" + configtemplate_fn = config['ConfigFilePath'] + configName_withoutEpi = f"{os.path.basename(configtemplate_fn).replace('.json','')}_{startString}-{endString}" - logger.debug(f"length of config filename is {len(config_filename)}.") + # create a string describing every epi calc configuration + epiStrings = [] + for epiconf in config['Epidemiology']['Epi']: + epiKwargsString = ''.join([f"{k}{v}" for k,v in epiconf['modelArguments'].items()]) - if len(config_filename) > 254: - logger.info(f"filename length is too long, it will raise an OSError, using a short form instead") + # drop any repetitive elements of kwarg + epiKwargsString = epiKwargsString.replace('infectionprevious','') + epiKwargsString = epiKwargsString.replace('capbeta','cb') - # epi cases are not described in filename, an interested user - # must look in the json file for details. - config_filename = configName_withoutEpi + epiCaseString = f"{epiconf['model'].lower()}{epiKwargsString}" - assert len(config_filename) <= 254 + # provide to configuration for output filename + epiconf["infectionRasterFileName"] = f"{jobPath}/infections_{configName_withoutEpi}_{epiCaseString}" - return config_filename + epiStrings += [epiCaseString] -def are_indices_close(idx1: MultiIndex, idx2: MultiIndex, atol=2.51e-6) -> bool: - """An absolute tolerance of 2.51e-6 relates to differences between the - grid's of NAME vn7.2 output and met-extractor in 2022.""" + epiString = '-'.join(epiStrings) - assert idx1.nlevels == idx2.nlevels - num_levels = idx1.nlevels + config_filename = f"{configName_withoutEpi}_{epiString}" - # a stricter check is idx_i.equals(idx_0) + self.logger.debug(f"length of config filename is {len(config_filename)}.") - levels_close = [] - for i in range(num_levels): - close_i = allclose(idx1.get_level_values(i),idx2.get_level_values(i),atol=atol,rtol=0) - levels_close += [close_i] - - return all(levels_close) + if len(config_filename) > 254: + self.logger.info(f"filename length is too long, it will raise an OSError, using a short form instead") -def raster_to_series(raster_fn): + # epi cases are not described in filename, an interested user + # must look in the json file for details. + config_filename = configName_withoutEpi - with rio_open(raster_fn,'r') as host_raster: - host_arr = host_raster.read(1) - shape = host_raster.shape + assert len(config_filename) <= 254 - # determine coordinates - coords = [host_raster.xy(i,j) for i in range(shape[0]) for j in range(shape[1])] - lons = unique([ci[0] for ci in coords]) - lats = unique([ci[1] for ci in coords]) - assert shape == (lats.size,lons.size) + return config_filename - # build into a dataframe - # (rasters start in the top left, so descending latitude coordinates) - host_df = DataFrame(data=host_arr,index=lats[::-1],columns=lons) - host_df.index.name = 'latitude' - host_df.columns.name = 'longitude' - # rearrange to ascending latitude corodinates - host_df.sort_index(axis='rows',inplace=True) - # make spatial coordinates a multi-index, like for dep and env suit csvs - host_series = host_df.stack() + def are_indices_close(self, idx1: MultiIndex, idx2: MultiIndex, atol=2.51e-6) -> bool: + """An absolute tolerance of 2.51e-6 relates to differences between the + grid's of NAME vn7.2 output and met-extractor in 2022.""" - return host_series + assert idx1.nlevels == idx2.nlevels + num_levels = idx1.nlevels -def rasters_to_csv( - raster_fns_dict: dict, - csv_fn: str, - ): - """Takes a dictionary of raster files with associated times and saves them - as rows of a single csv. The csv columns and index structure matches model - outputs as expected by the epi model. Used to prepare the host data.""" - - host_serieses = [] - first = True - for date_valid_from, raster_fn in raster_fns_dict.items(): - - host_series = raster_to_series(raster_fn) + # a stricter check is idx_i.equals(idx_0) - # for now, provide a nominal date of validity to enable a time column - # so far, using mapspam which is a static map, so time is irrelevant - host_series.name = date_valid_from + levels_close = [] + for i in range(num_levels): + close_i = allclose(idx1.get_level_values(i),idx2.get_level_values(i),atol=atol,rtol=0) + levels_close += [close_i] - # conform indices (handle float differences) - if first: - idx_0 = host_series.index + return all(levels_close) - if not first: - idx_i = host_series.index - - indices_are_close = are_indices_close(idx_0,idx_i) - assert indices_are_close, (f"Coordinates of host_rasters do not match.\nFailed for {raster_fn}.") - host_series.index = idx_0 - - first = False - - host_serieses += [host_series] - - host_df = DataFrame(host_serieses) + def raster_to_series(self, raster_fn): - host_df.to_csv(csv_fn) + with rio_open(raster_fn,'r') as host_raster: + host_arr = host_raster.read(1) + shape = host_raster.shape - return - -def get_model_divided_by_host_fraction( - dfm, - hostCSV, - model_colns=None, - **kwargs): - """when model_infection pressure has units of [ha_infected/ha_cell] - we want [ha_infected/ha_wheat] to compare with surveys - (because surveys only sample the wheat covered landscape) - so we must load the host raster and divide all model results by it. - - TODO: Instead of doing as post-processing in coordinator, best do it within - the ews-epidemiology package. - """ - - print('Converting units of prediction from ha_infected/ha_cell to ha_infect/ha_wheat') - - # load host raster - host_fn = hostCSV - host_df = read_csv(host_fn,index_col=0,header=[0,1]) - host_df.columns = host_df.columns.set_levels([lvl.astype('float') for lvl in host_df.columns.levels]) - host_df.index = to_datetime(host_df.index,format='%Y%m%d%H%M') - host_df = host_df.T + # determine coordinates + coords = [host_raster.xy(i,j) for i in range(shape[0]) for j in range(shape[1])] + lons = unique([ci[0] for ci in coords]) + lats = unique([ci[1] for ci in coords]) + assert shape == (lats.size,lons.size) - # conform the structure with infection dataframe + # build into a dataframe + # (rasters start in the top left, so descending latitude coordinates) + host_df = DataFrame(data=host_arr,index=lats[::-1],columns=lons) + host_df.index.name = 'latitude' + host_df.columns.name = 'longitude' + # rearrange to ascending latitude corodinates + host_df.sort_index(axis='rows',inplace=True) + # make spatial coordinates a multi-index, like for dep and env suit csvs + host_series = host_df.stack() - # conform indices (coordinates) - host_df.index = host_df.index.reorder_levels(['longitude','latitude']) - host_df.sort_index(level=['longitude','latitude'],ascending=[True,False],inplace=True) + return host_series - indices_are_close = are_indices_close(host_df.index,dfm.index) - assert indices_are_close, ('Coordinates of model grid do not match host map.') - host_df.index = dfm.index + def rasters_to_csv( + self, + raster_fns_dict: dict, + csv_fn: str, + ): + """Takes a dictionary of raster files with associated times and saves them + as rows of a single csv. The csv columns and index structure matches model + outputs as expected by the epi model. Used to prepare the host data.""" - # conform columns (dates) - column_end_dates = dfm.columns.map(lambda x: x[-12:]) - model_dates = to_datetime(column_end_dates, format='%Y%m%d%H%M') - datetime.timedelta(days=1) - dfm2 = dfm.copy() - dfm2.columns = model_dates - - # Set a host value for every model date, based forward-filling dated map to - # next available date - host_df_resampled = host_df.reindex(dfm2.columns,axis='columns',method='ffill') - assert not host_df_resampled.isna().any().any(), ('Dates of host rasters do not cover all model dates') - - # new approach, take advantage of pandas broadcasting - print('Applying unit conversion to all columns in output') - dfm3 = dfm2.divide(host_df_resampled) - # Handle cases of zero division - dfm3[host_df_resampled<=0]=0 - - # check for anomalously large values - where_too_big = dfm3 > 1.00001 - if any(where_too_big): - msg = 'ERROR: Unit conversion failed, host area seems to be smaller than predicted infection area in a cell' - print(msg) - raise Exception + host_serieses = [] + first = True + for date_valid_from, raster_fn in raster_fns_dict.items(): - # clip any values that are above 1 - # (Expect this is not needed, but may help resolve float precision issues) - dfm3.clip(0.,1.,inplace=True) - - # Retain original column names - dfm3.columns = dfm.columns + host_series = self.raster_to_series(raster_fn) - return dfm3 + # for now, provide a nominal date of validity to enable a time column + # so far, using mapspam which is a static map, so time is irrelevant + host_series.name = date_valid_from -def process_in_job_epi(jobPath,status,config,component): - logger.info('started process_in_job_epi()') + # conform indices (handle float differences) + if first: + idx_0 = host_series.index - # TODO: Some of this is modifying config before epi model is run. Determine - # how to account for that + if not first: + idx_i = host_series.index - # initialise any needed variables + indices_are_close = self.are_indices_close(idx_0,idx_i) + assert indices_are_close, (f"Coordinates of host_rasters do not match.\nFailed for {raster_fn}.") + host_series.index = idx_0 - reference_date_str = config['StartString'] - reference_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d') + first = False - start_date, end_date = calc_epi_date_range(reference_date_str,config['Epidemiology']['CalculationSpanDays']) + host_serieses += [host_series] - date_diff = end_date - start_date + host_df = DataFrame(host_serieses) - start_string = start_date.strftime('%Y-%m-%d-%H%M') - start_string_short = start_date.strftime('%Y%m%d%H%M') - end_string = end_date.strftime('%Y-%m-%d-%H%M') + host_df.to_csv(csv_fn) - # update config accordingly - config['ReferenceTime'] = reference_date_str - config['StartTime'] = start_string - config['StartTimeShort'] = start_string_short - config['EndTime'] = end_string + return - yesterday_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d') - datetime.timedelta(days=1) - yesterday_string = yesterday_date.strftime('%Y%m%d') + def get_model_divided_by_host_fraction( + self, + dfm, + hostCSV, + model_colns=None, + **kwargs): + """when model_infection pressure has units of [ha_infected/ha_cell] + we want [ha_infected/ha_wheat] to compare with surveys + (because surveys only sample the wheat covered landscape) + so we must load the host raster and divide all model results by it. - diseases = config['Epidemiology']['DiseaseNames'] + TODO: Instead of doing as post-processing in coordinator, best do it within + the ews-epidemiology package. + """ - def gather_dependent_models(config_epi,config,variable_name,start_date,reference_date,end_date,jobDataPath,lastjobDataPath,status,component='Deposition'): + print('Converting units of prediction from ha_infected/ha_cell to ha_infect/ha_wheat') - # This function is only prepared for components in this list - assert component in ['Deposition','Environment'] + # load host raster + host_fn = hostCSV + host_df = read_csv(host_fn,index_col=0,header=[0,1]) + host_df.columns = host_df.columns.set_levels([lvl.astype('float') for lvl in host_df.columns.levels]) + host_df.index = to_datetime(host_df.index,format='%Y%m%d%H%M') + host_df = host_df.T - # TODO: Simplify the set of required arguments. Check if config is necessary. + # conform the structure with infection dataframe - config_epi[component]['VariableName'] = variable_name # disease_latin_name_dict[disease]+'_DEPOSITION' + # conform indices (coordinates) + host_df.index = host_df.index.reorder_levels(['longitude','latitude']) + host_df.sort_index(level=['longitude','latitude'],ascending=[True,False],inplace=True) - config_epi[component]['FileNamePrepared'] = f"{jobDataPath}/data_input_{component.lower()}.csv" + indices_are_close = self.are_indices_close(host_df.index,dfm.index) + assert indices_are_close, ('Coordinates of model grid do not match host map.') + host_df.index = dfm.index - config_epi[component]['LastFileNamePrepared'] = f"{lastjobDataPath}/data_input_{component.lower()}.csv" + # conform columns (dates) + column_end_dates = dfm.columns.map(lambda x: x[-12:]) + model_dates = to_datetime(column_end_dates, format='%Y%m%d%H%M') - datetime.timedelta(days=1) + dfm2 = dfm.copy() + dfm2.columns = model_dates - # Use config-defined file lister - file_lister_name = config_epi[component]['FileListerFunction'] + # Set a host value for every model date, based forward-filling dated map to + # next available date + host_df_resampled = host_df.reindex(dfm2.columns,axis='columns',method='ffill') + assert not host_df_resampled.isna().any().any(), ('Dates of host rasters do not cover all model dates') - file_lister_func = getattr(lister,file_lister_name) + # new approach, take advantage of pandas broadcasting + print('Applying unit conversion to all columns in output') + dfm3 = dfm2.divide(host_df_resampled) + # Handle cases of zero division + dfm3[host_df_resampled<=0]=0 - config_for_lister = config.copy() - config_for_lister.update(config_epi) + # check for anomalously large values + where_too_big = dfm3 > 1.00001 + if any(where_too_big): + msg = 'ERROR: Unit conversion failed, host area seems to be smaller than predicted infection area in a cell' + print(msg) + raise Exception - lister_kwargs = {} - lister_kwargs['reference_date']=config['ReferenceTime'] + # clip any values that are above 1 + # (Expect this is not needed, but may help resolve float precision issues) + dfm3.clip(0.,1.,inplace=True) - loader_kwargs= {} + # Retain original column names + dfm3.columns = dfm.columns - loader_dict = { - 'Deposition' : loader.load_NAME_file, - 'Environment' : loader.load_env_file, - } + return dfm3 - loader_func = loader_dict[component] + def process_in_job_epi(self, jobPath,status,config,component): + self.logger.info('started process_in_job_epi()') - # Provide component-specific variables - if component == 'Deposition': + # TODO: Some of this is modifying config before epi model is run. Determine + # how to account for that - loader_kwargs['VariableName']= config_for_lister[component].get('VariableName') - loader_kwargs['VariableNameAlternative']= config_for_lister[component].get('VariableNameAlternative') + # initialise any needed variables - try: - # Make use of data prepared yesterday - updater.update_input( - config_for_lister, - reference_date, - end_date, - component=component, - file_lister=file_lister_func, - file_loader=loader_func, - lister_kwargs=lister_kwargs, - update_period_days=3, - **loader_kwargs) + reference_date_str = config['StartString'] + reference_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d') - assert os.path.isfile(config_epi[component]['FileNamePrepared']) + start_date, end_date = self.calc_epi_date_range(reference_date_str,config['Epidemiology']['CalculationSpanDays']) - except AssertionError: + date_diff = end_date - start_date - logger.exception(f"Unexpected error in {component} data preparation (updater)") + start_string = start_date.strftime('%Y-%m-%d-%H%M') + start_string_short = start_date.strftime('%Y%m%d%H%M') + end_string = end_date.strftime('%Y-%m-%d-%H%M') - # Performa a fresh load of the full time series + # update config accordingly + config['ReferenceTime'] = reference_date_str + config['StartTime'] = start_string + config['StartTimeShort'] = start_string_short + config['EndTime'] = end_string - try: + yesterday_date = datetime.datetime.strptime(reference_date_str,'%Y%m%d') - datetime.timedelta(days=1) + yesterday_string = yesterday_date.strftime('%Y%m%d') + + diseases = config['Epidemiology']['DiseaseNames'] + + def gather_dependent_models(config_epi,config,variable_name,start_date,reference_date,end_date,jobDataPath,lastjobDataPath,status,component='Deposition'): + + # This function is only prepared for components in this list + assert component in ['Deposition','Environment'] + + # TODO: Simplify the set of required arguments. Check if config is necessary. + + config_epi[component]['VariableName'] = variable_name # disease_latin_name_dict[disease]+'_DEPOSITION' + + config_epi[component]['FileNamePrepared'] = f"{jobDataPath}/data_input_{component.lower()}.csv" + + config_epi[component]['LastFileNamePrepared'] = f"{lastjobDataPath}/data_input_{component.lower()}.csv" + + # Use config-defined file lister + file_lister_name = config_epi[component]['FileListerFunction'] + + file_lister_func = getattr(lister,file_lister_name) - prep.prep_input( + config_for_lister = config.copy() + config_for_lister.update(config_epi) + + lister_kwargs = {} + lister_kwargs['reference_date']=config['ReferenceTime'] + + loader_kwargs= {} + + loader_dict = { + 'Deposition' : loader.load_NAME_file, + 'Environment' : loader.load_env_file, + } + + loader_func = loader_dict[component] + + # Provide component-specific variables + if component == 'Deposition': + + loader_kwargs['VariableName']= config_for_lister[component].get('VariableName') + loader_kwargs['VariableNameAlternative']= config_for_lister[component].get('VariableNameAlternative') + + try: + # Make use of data prepared yesterday + updater.update_input( config_for_lister, - start_date, + reference_date, end_date, component=component, file_lister=file_lister_func, file_loader=loader_func, lister_kwargs=lister_kwargs, + update_period_days=3, **loader_kwargs) assert os.path.isfile(config_epi[component]['FileNamePrepared']) - except: + except AssertionError: - logger.exception(f"Unexpected error in {component} data preparation (full load)") - status.reset('ERROR') - endJob(status,premature=True) + self.logger.exception(f"Unexpected error in {component} data preparation (updater)") - return + # Performa a fresh load of the full time series + + try: + + prep.prep_input( + config_for_lister, + start_date, + end_date, + component=component, + file_lister=file_lister_func, + file_loader=loader_func, + lister_kwargs=lister_kwargs, + **loader_kwargs) + + assert os.path.isfile(config_epi[component]['FileNamePrepared']) - # get list of variable names to be loaded from deposition input - depo_variable_names = config['Epidemiology']['Deposition']['VariableNames'] - assert len(depo_variable_names) == len(diseases) + except: - # loop over each sub region + self.logger.exception(f"Unexpected error in {component} data preparation (full load)") + status.reset('ERROR') + endJob(status,premature=True) - region = config['RegionName'] - #for region in config['SubRegionNames']: + return - for disease in diseases: + # get list of variable names to be loaded from deposition input + depo_variable_names = config['Epidemiology']['Deposition']['VariableNames'] + assert len(depo_variable_names) == len(diseases) - assert disease in disease_latin_name_dict.keys() + # loop over each sub region - config['SubRegionName'] = region - config['DiseaseName'] = disease + region = config['RegionName'] + #for region in config['SubRegionNames']: - config_epi = config['Epidemiology'].copy() + for disease in diseases: - # TODO: CAUTION: Any iterations (e.g. disease or sub-region) are hidden - # in jobPath, and not retained in the config file. This is a provlem for - # process_EWS_plotting_epi which receives a single config file and must - # try a fudge to retrieve details for each iteration. - # This should be improved, either by making the one config file - # aware of all of the iterations, or looping over iterations in - # Processor.py with one iteration-specific config. - case_specific_path = f"{jobPath}/{region}/{disease}/" - Path(case_specific_path).mkdir(parents=True, exist_ok=True) + assert disease in disease_latin_name_dict.keys() - logger.info(f"Preparing for epidemiology calc of {disease} in {region}") + config['SubRegionName'] = region + config['DiseaseName'] = disease - # create config_filename to describe job configuration - config_filename = create_epi_config_string(config,case_specific_path,start_string,end_string) + config_epi = config['Epidemiology'].copy() - # prepare a directory for input data - jobDataPath = f"{case_specific_path}/input_data/" - Path(jobDataPath).mkdir(parents=True, exist_ok=True) + # TODO: CAUTION: Any iterations (e.g. disease or sub-region) are hidden + # in jobPath, and not retained in the config file. This is a provlem for + # process_EWS_plotting_epi which receives a single config file and must + # try a fudge to retrieve details for each iteration. + # This should be improved, either by making the one config file + # aware of all of the iterations, or looping over iterations in + # Processor.py with one iteration-specific config. + case_specific_path = f"{jobPath}/{region}/{disease}/" + Path(case_specific_path).mkdir(parents=True, exist_ok=True) - lastjobDataPath = jobDataPath.replace(f"EPI_{reference_date_str}",f"EPI_{yesterday_string}") + self.logger.info(f"Preparing for epidemiology calc of {disease} in {region}") - # configure filename of prepared deposition data + # create config_filename to describe job configuration + config_filename = self.create_epi_config_string(config,case_specific_path,start_string,end_string) - if 'Deposition' in config_epi: + # prepare a directory for input data + jobDataPath = f"{case_specific_path}/input_data/" + Path(jobDataPath).mkdir(parents=True, exist_ok=True) - # determine which variable name to load for this disease - disease_idx = [i for i,j in enumerate(diseases) if j==disease][0] + lastjobDataPath = jobDataPath.replace(f"EPI_{reference_date_str}",f"EPI_{yesterday_string}") - variable_name = depo_variable_names[disease_idx] + # configure filename of prepared deposition data - gather_dependent_models( - config_epi, - config, - variable_name, - start_date, - reference_date, - end_date, - jobDataPath, - lastjobDataPath, - status, - component='Deposition') - - # configure filename of prepared deposition data + if 'Deposition' in config_epi: - if 'Environment' in config_epi: + # determine which variable name to load for this disease + disease_idx = [i for i,j in enumerate(diseases) if j==disease][0] - logger.info('Preparing environmental suitability data') + variable_name = depo_variable_names[disease_idx] - gather_dependent_models( - config_epi, - config, - variable_name, - start_date, - reference_date, - end_date, - jobDataPath, - lastjobDataPath, - status, - component='Environment') - - # prepare a copy of the host data + gather_dependent_models( + config_epi, + config, + variable_name, + start_date, + reference_date, + end_date, + jobDataPath, + lastjobDataPath, + status, + component='Deposition') - logger.info('Preparing a copy of the host raster data') - - # TargetRaster defines the grid that the epi model works on. - assert 'TargetRaster' in config_epi['Host'] + # configure filename of prepared deposition data - # It should have been generated in advance by the user, by reprojecting - # the available host map (e.g. MapSPAM) to the NAME output grid. - # wheat_raster_reprojection.py is available to support this. + if 'Environment' in config_epi: - if 'HostRasters' in config_epi['Host']: - # HostRasters is a dictionary with date: filename entries describing - # different host rasters valid at different times i.e. a simple - # representation of dynamic host, so prepare a host file as is done - # for the Deposition and Environment components. + self.logger.info('Preparing environmental suitability data') - # All host maps should have the same spatial grid as the TargetRaster + gather_dependent_models( + config_epi, + config, + variable_name, + start_date, + reference_date, + end_date, + jobDataPath, + lastjobDataPath, + status, + component='Environment') - rasters_dict = config_epi['Host']['HostRasters'] + # prepare a copy of the host data - dst_host_csv = f"{jobDataPath}/data_input_host.csv" + self.logger.info('Preparing a copy of the host raster data') - rasters_to_csv(rasters_dict,dst_host_csv) + # TargetRaster defines the grid that the epi model works on. + assert 'TargetRaster' in config_epi['Host'] - else: - # There is a host raster applicable to all times, i.e. static host + # It should have been generated in advance by the user, by reprojecting + # the available host map (e.g. MapSPAM) to the NAME output grid. + # wheat_raster_reprojection.py is available to support this. - src_host = config_epi['Host']['TargetRaster'] - fn_host = os.path.basename(src_host) - dst_host = f"{jobDataPath}/{fn_host}" + if 'HostRasters' in config_epi['Host']: + # HostRasters is a dictionary with date: filename entries describing + # different host rasters valid at different times i.e. a simple + # representation of dynamic host, so prepare a host file as is done + # for the Deposition and Environment components. - # copy the tif to the job directory and refer to that instead - shutil.copyfile(src_host,dst_host) - config_epi['Host']['TargetRaster'] = dst_host + # All host maps should have the same spatial grid as the TargetRaster - logger.info('Preparing a copy of the host data as csv') + rasters_dict = config_epi['Host']['HostRasters'] - dst_host_csv = dst_host.replace('.tif','.csv') + dst_host_csv = f"{jobDataPath}/data_input_host.csv" - rasters_to_csv( - {'201001010000': dst_host}, - dst_host_csv) + self.rasters_to_csv(rasters_dict,dst_host_csv) - config_epi['Host']['HostCSV'] = dst_host_csv - config_epi['Host']['FileNamePrepared'] = dst_host_csv + else: + # There is a host raster applicable to all times, i.e. static host - # provide fundamental config elements to config_epi - for k,v in config.items(): - if k not in short_name.keys(): - config_epi[k]=v + src_host = config_epi['Host']['TargetRaster'] + fn_host = os.path.basename(src_host) + dst_host = f"{jobDataPath}/{fn_host}" - logger.debug('Incremental configuration looks like:') - def print_item(item): - logger.debug(f"Item {item}") - logger.debug(json.dumps(item,indent=2)) - def iterate(items): - for item in items.items(): - if hasattr(item,'items'): - # iterate - iterate(item) - else: - print_item(item) - iterate(config_epi) + # copy the tif to the job directory and refer to that instead + shutil.copyfile(src_host,dst_host) + config_epi['Host']['TargetRaster'] = dst_host - logger.debug('Complete configuration looks like:') - logger.debug(json.dumps(config_epi,indent=2)) + self.logger.info('Preparing a copy of the host data as csv') - # write the complete configuration file to job directory - with open(f"{case_specific_path}/{config_filename}.json",'w') as write_file: - json.dump(config_epi,write_file,indent=4) + dst_host_csv = dst_host.replace('.tif','.csv') - # run epi model + self.rasters_to_csv( + {'201001010000': dst_host}, + dst_host_csv) - try: - EpiModel.run_epi_model(f"{case_specific_path}/{config_filename}.json") - except: - logger.exception('Unexpected error in EpiModel') - raise + config_epi['Host']['HostCSV'] = dst_host_csv + config_epi['Host']['FileNamePrepared'] = dst_host_csv - # perform calc on output + # provide fundamental config elements to config_epi + for k,v in config.items(): + if k not in short_name.keys(): + config_epi[k]=v - def calc_total(arr): - return 'total', arr.sum() + self.logger.debug('Incremental configuration looks like:') + def print_item(item): + self.logger.debug(f"Item {item}") + self.logger.debug(json.dumps(item,indent=2)) + def iterate(items): + for item in items.items(): + if hasattr(item,'items'): + # iterate + iterate(item) + else: + print_item(item) + iterate(config_epi) - def calc_max(arr): - return 'maximum', arr.max() + self.logger.debug('Complete configuration looks like:') + self.logger.debug(json.dumps(config_epi,indent=2)) - def calc_mean(arr): - return 'mean', arr.mean() + # write the complete configuration file to job directory + with open(f"{case_specific_path}/{config_filename}.json",'w') as write_file: + json.dump(config_epi,write_file,indent=4) - for epiconf in config['Epidemiology']['Epi']: + # run epi model + + try: + EpiModel.run_epi_model(f"{case_specific_path}/{config_filename}.json") + except: + self.logger.exception('Unexpected error in EpiModel') + raise - outfile = epiconf["infectionRasterFileName"] + # perform calc on output - with rio_open(outfile+'.tif','r') as infectionRaster: - infection = infectionRaster.read(1) + def calc_total(arr): + return 'total', arr.sum() - # define function to quantify overall result, for easy check - # TODO: Create a more meaningful result? - # TODO: make this configurable - analysis_func = calc_mean + def calc_max(arr): + return 'maximum', arr.max() - analysis_desc, analysis_value = analysis_func(infection) + def calc_mean(arr): + return 'mean', arr.mean() - logger.info(f"For case {outfile}") - logger.info('Infection {:s} is {:.2e}'.format( analysis_desc, analysis_value)) + for epiconf in config['Epidemiology']['Epi']: - # to save tif as png for easy viewing - logger.debug('Saving tif output as png for easier viewing') - plotRaster.save_raster_as_png(outfile) + outfile = epiconf["infectionRasterFileName"] - # comparison figure + with rio_open(outfile+'.tif','r') as infectionRaster: + infection = infectionRaster.read(1) - # TODO: make this plot configurable? with function or args? - #logger.info('Plotting epi output alongside contributing components') - # figure_func = getattr(EpiAnalysis,'plot_compare_host_env_dep_infection') - logger.info('Plotting composite image of epi formulations') - figure_func = getattr(EpiAnalysis,'plot_compare_epi_cases') + # define function to quantify overall result, for easy check + # TODO: Create a more meaningful result? + # TODO: make this configurable + analysis_func = calc_mean - # isolate the config for this function, in case of modifications - config_epi_for_comparison = config_epi.copy() + analysis_desc, analysis_value = analysis_func(infection) - fig,axes,cases = figure_func( - config_epi_for_comparison, - start_str = start_string, - end_str = end_string) + self.logger.info(f"For case {outfile}") + self.logger.info('Infection {:s} is {:.2e}'.format( analysis_desc, analysis_value)) - SaveFileName = f"{case_specific_path}/EPI_{config_filename}_comparison" + # to save tif as png for easy viewing + self.logger.debug('Saving tif output as png for easier viewing') + plotRaster.save_raster_as_png(outfile) - fig.savefig(SaveFileName+'.png',dpi=300) + # comparison figure - # slice the epi results into before forecast and in forecast + # TODO: make this plot configurable? with function or args? + #logger.info('Plotting epi output alongside contributing components') + # figure_func = getattr(EpiAnalysis,'plot_compare_host_env_dep_infection') + self.logger.info('Plotting composite image of epi formulations') + figure_func = getattr(EpiAnalysis,'plot_compare_epi_cases') - for epiconf in config['Epidemiology']['Epi']: + # isolate the config for this function, in case of modifications + config_epi_for_comparison = config_epi.copy() - outfile = epiconf["infectionRasterFileName"]+'_progression.csv' + fig,axes,cases = figure_func( + config_epi_for_comparison, + start_str = start_string, + end_str = end_string) - # load the full epi results - df_full = read_csv(outfile,header=[0],index_col=[0,1]) - column_date_fmt = f"X{config['StartTimeShort']}_X%Y%m%d%H%M" - df_full_dates = to_datetime(df_full.columns.astype('str'),format=column_date_fmt) + SaveFileName = f"{case_specific_path}/EPI_{config_filename}_comparison" - unit_description = '' + fig.savefig(SaveFileName+'.png',dpi=300) - if epiconf['rescale_output_by_host_raster'] is True: + # slice the epi results into before forecast and in forecast - unit_description = '_per_ha_wheat' + for epiconf in config['Epidemiology']['Epi']: - model_colns = df_full.columns + outfile = epiconf["infectionRasterFileName"]+'_progression.csv' - # convert units from ha_infected/ha_cell to ha_infected/ha_wheat - - df_full = get_model_divided_by_host_fraction( - df_full, - config_epi['Host']['HostCSV'], - model_colns = model_colns) + # load the full epi results + df_full = read_csv(outfile,header=[0],index_col=[0,1]) + column_date_fmt = f"X{config['StartTimeShort']}_X%Y%m%d%H%M" + df_full_dates = to_datetime(df_full.columns.astype('str'),format=column_date_fmt) - # save to csv - outfile_hawheat = f"{epiconf['infectionRasterFileName']}{unit_description}_progression.csv" - df_full.to_csv(outfile_hawheat,header=True,index=True) + unit_description = '' + + if epiconf['rescale_output_by_host_raster'] is True: + + unit_description = '_per_ha_wheat' + + model_colns = df_full.columns - outfile_hawheat_final = f"{epiconf['infectionRasterFileName']}{unit_description}.csv" - df_full.iloc[:,-1].to_csv(outfile_hawheat_final,header=True,index=True) + # convert units from ha_infected/ha_cell to ha_infected/ha_wheat - # determine date to cut with - # plus 1 minute so midnight is associated with preceding day - date_to_cut = datetime.datetime.strptime(config['StartString']+'0001','%Y%m%d%H%M') - dates_after_cut = df_full_dates >= date_to_cut - idx = argmax(dates_after_cut)-1 + df_full = self.get_model_divided_by_host_fraction( + df_full, + config_epi['Host']['HostCSV'], + model_colns = model_colns) - if idx == -1: - # only working on forecast dates so no season so far, skip - continue + # save to csv + outfile_hawheat = f"{epiconf['infectionRasterFileName']}{unit_description}_progression.csv" + df_full.to_csv(outfile_hawheat,header=True,index=True) - # build seasonsofar dataframe (only need the last date) - df_seasonsofar = df_full.iloc[:,idx] + outfile_hawheat_final = f"{epiconf['infectionRasterFileName']}{unit_description}.csv" + df_full.iloc[:,-1].to_csv(outfile_hawheat_final,header=True,index=True) - # check column name is defined as expected - # from epi start time to forecast start time - column_name = f"X{config['StartTimeShort']}_X{config['StartString']}0000" - assert df_seasonsofar.name == column_name + # determine date to cut with + # plus 1 minute so midnight is associated with preceding day + date_to_cut = datetime.datetime.strptime(config['StartString']+'0001','%Y%m%d%H%M') + dates_after_cut = df_full_dates >= date_to_cut + idx = argmax(dates_after_cut)-1 - # save to csv - fn_seasonsofar = f"{epiconf['infectionRasterFileName']}{unit_description}_seasonsofar.csv" - df_seasonsofar.to_csv(fn_seasonsofar,header=True,index=True) + if idx == -1: + # only working on forecast dates so no season so far, skip + continue - # build weekahead dataframe and save to csv - df_fc_start = df_full.iloc[:,idx] - df_fc_start_name = df_fc_start.name.split('_')[-1] + # build seasonsofar dataframe (only need the last date) + df_seasonsofar = df_full.iloc[:,idx] - df_fc_end = df_full.iloc[:,-1] - df_fc_end_name = df_fc_end.name.split('_')[-1] + # check column name is defined as expected + # from epi start time to forecast start time + column_name = f"X{config['StartTimeShort']}_X{config['StartString']}0000" + assert df_seasonsofar.name == column_name - df_weekahead = df_fc_end - df_fc_start + # save to csv + fn_seasonsofar = f"{epiconf['infectionRasterFileName']}{unit_description}_seasonsofar.csv" + df_seasonsofar.to_csv(fn_seasonsofar,header=True,index=True) - # defined column name - fn_weekahead = f"{epiconf['infectionRasterFileName']}{unit_description}_weekahead.csv" - df_weekahead.name = '_'.join([df_fc_start_name,df_fc_end_name]) + # build weekahead dataframe and save to csv + df_fc_start = df_full.iloc[:,idx] + df_fc_start_name = df_fc_start.name.split('_')[-1] - # save to csv - df_weekahead.to_csv(fn_weekahead,header=True,index=True) - - proc_out = {} - # Output files available for upload - proc_out['output'] = None - # Processing files available for clearing - proc_out['clearup'] = None + df_fc_end = df_full.iloc[:,-1] + df_fc_end_name = df_fc_end.name.split('_')[-1] - return proc_out + df_weekahead = df_fc_end - df_fc_start -def process_EWS_plotting_epi(jobPath,config): - '''Returns a list of output files for transfer.''' + # defined column name + fn_weekahead = f"{epiconf['infectionRasterFileName']}{unit_description}_weekahead.csv" + df_weekahead.name = '_'.join([df_fc_start_name,df_fc_end_name]) - logger.info('started process_EWS_plotting_epi()') + # save to csv + df_weekahead.to_csv(fn_weekahead,header=True,index=True) - # initalise necessary variables from config + proc_out = {} + # Output files available for upload + proc_out['output'] = None + # Processing files available for clearing + proc_out['clearup'] = None - start_date, end_date = calc_epi_date_range(config['StartString'],config['Epidemiology']['CalculationSpanDays']) + return proc_out - start_string = start_date.strftime('%Y%m%d') - end_string = end_date.strftime('%Y%m%d') + def process_EWS_plotting_epi(self, jobPath,config): + '''Returns a list of output files for transfer.''' - epi_case_operational = config['Epidemiology']['EWS-Plotting']['EpiCase'] + self.logger.info('started process_EWS_plotting_epi()') - if epi_case_operational == 'none': - logger.info('Config specifies not to call to EWS-Plotting') - return [] + # initalise necessary variables from config - diseases = config['Epidemiology']['DiseaseNames'] + start_date, end_date = self.calc_epi_date_range(config['StartString'],config['Epidemiology']['CalculationSpanDays']) - # initialise environment - sys_config = config['Epidemiology']['EWS-Plotting']['SysConfig'] + start_string = start_date.strftime('%Y%m%d') + end_string = end_date.strftime('%Y%m%d') - chart_config = config['Epidemiology']['EWS-Plotting']['ChartConfig'] + epi_case_operational = config['Epidemiology']['EWS-Plotting']['EpiCase'] - # use the first matching epi formulation - # TODO: Is there a more efficient way to select? - epi_filename = [ce['infectionRasterFileName'] for ce in config['Epidemiology']['Epi'] if ce['model']==epi_case_operational][0] + if epi_case_operational == 'none': + self.logger.info('Config specifies not to call to EWS-Plotting') + return [] - dep_regionnames = ['SouthAsia','Ethiopia'] + diseases = config['Epidemiology']['DiseaseNames'] - # TODO get deposition_dir from config['Epidemiology']['Deposition']['PathTemplate'] - dep_regionname = 'Ethiopia' #SouthAsia + # initialise environment + sys_config = config['Epidemiology']['EWS-Plotting']['SysConfig'] - deposition_dir = f"{config['WorkspacePath']}DEPOSITION_{start_string}/WR_NAME_{dep_regionname}_{start_string}/" + chart_config = config['Epidemiology']['EWS-Plotting']['ChartConfig'] - # TODO: handle multiple diseases and regions in Processor as a loop, or in the config - deposition_disease_name = [disease_latin_name_dict[disease]+'_DEPOSITION' for disease in diseases][0] + # use the first matching epi formulation + # TODO: Is there a more efficient way to select? + epi_filename = [ce['infectionRasterFileName'] for ce in config['Epidemiology']['Epi'] if ce['model']==epi_case_operational][0] - ews_plot_dir = f"{jobPath}/plotting/" + dep_regionnames = ['SouthAsia','Ethiopia'] - Path(ews_plot_dir).mkdir(parents=True, exist_ok=True) + # TODO get deposition_dir from config['Epidemiology']['Deposition']['PathTemplate'] + dep_regionname = 'Ethiopia' #SouthAsia - # loop over diseases - EWSPlottingOutputGlobs = [] - for disease in diseases: - disease_short = disease.lower().replace('rust','') + deposition_dir = f"{config['WorkspacePath']}DEPOSITION_{start_string}/WR_NAME_{dep_regionname}_{start_string}/" - # a fudge, guess disease type - # because config['Epidemiology']['ProcessInJob'] handles disease loop internally - # assumes disease name is the last directory before the filename # TODO: handle multiple diseases and regions in Processor as a loop, or in the config - disease_to_drop = os.path.dirname(epi_filename).split('/')[-1].replace('Rust','') - disease_to_add = disease.replace('Rust','') - epi_filename = epi_filename.replace(disease_to_drop,disease_to_add) + deposition_disease_name = [disease_latin_name_dict[disease]+'_DEPOSITION' for disease in diseases][0] - map_title = "Integrated prediction of Wheat $\\bf{" + disease_to_add + "}$ Rust infection" - if 'PlottingRegionName' not in config['Epidemiology']['EWS-Plotting']: - plotting_region_name_lower = config['RegionName'].lower() - else: - plotting_region_name_lower = config['Epidemiology']['EWS-Plotting']['PlottingRegionName'].lower() + ews_plot_dir = f"{jobPath}/plotting/" + + Path(ews_plot_dir).mkdir(parents=True, exist_ok=True) + + # loop over diseases + EWSPlottingOutputGlobs = [] + for disease in diseases: + disease_short = disease.lower().replace('rust','') + + # a fudge, guess disease type + # because config['Epidemiology']['ProcessInJob'] handles disease loop internally + # assumes disease name is the last directory before the filename + # TODO: handle multiple diseases and regions in Processor as a loop, or in the config + disease_to_drop = os.path.dirname(epi_filename).split('/')[-1].replace('Rust','') + disease_to_add = disease.replace('Rust','') + epi_filename = epi_filename.replace(disease_to_drop,disease_to_add) + + map_title = "Integrated prediction of Wheat $\\bf{" + disease_to_add + "}$ Rust infection" + if 'PlottingRegionName' not in config['Epidemiology']['EWS-Plotting']: + plotting_region_name_lower = config['RegionName'].lower() + else: + plotting_region_name_lower = config['Epidemiology']['EWS-Plotting']['PlottingRegionName'].lower() + + epi_seasonsofar_fn = epi_filename+'_per_ha_wheat_seasonsofar.csv' - epi_seasonsofar_fn = epi_filename+'_per_ha_wheat_seasonsofar.csv' + epi_seasonincforecast_fn = epi_filename+'_per_ha_wheat.csv' - epi_seasonincforecast_fn = epi_filename+'_per_ha_wheat.csv' + seasonsofar_run_config = config['Epidemiology']['EWS-Plotting'].get('RunConfig_seasonsofar',None) - seasonsofar_run_config = config['Epidemiology']['EWS-Plotting'].get('RunConfig_seasonsofar',None) + # only run season so far (i.e. historic dates) if they exist + if (seasonsofar_run_config is not None) & os.path.exists(epi_seasonsofar_fn): - # only run season so far (i.e. historic dates) if they exist - if (seasonsofar_run_config is not None) & os.path.exists(epi_seasonsofar_fn): + self.logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{seasonsofar_run_config}\n{chart_config}") - logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{seasonsofar_run_config}\n{chart_config}") + epi_processor_1 = EPIPostPostProcessor() + epi_processor_1.set_param_config_files(sys_params_file_arg=sys_config, + chart_params_file_arg=chart_config, + run_params_file_arg=seasonsofar_run_config, + epi_input_csv_arg=epi_seasonsofar_fn, + disease_type_arg=disease_short+'_seasontodate', + issue_date_arg=start_string, + output_dir_arg=ews_plot_dir, + wheat_sources_dir_arg=deposition_dir, + wheat_source_disease_name_arg=deposition_disease_name, + map_title_arg=map_title, + chart_area_prefix=plotting_region_name_lower) + epi_processor_1.process() - epi_processor_1 = EPIPostPostProcessor() - epi_processor_1.set_param_config_files(sys_params_file_arg=sys_config, + # prepare command for seasonplusforecast + + run_config = config['Epidemiology']['EWS-Plotting']['RunConfig_seasonplusforecast'] + + self.logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") + + epi_processor_2 = EPIPostPostProcessor() + epi_processor_2.set_param_config_files(sys_params_file_arg=sys_config, chart_params_file_arg=chart_config, - run_params_file_arg=seasonsofar_run_config, - epi_input_csv_arg=epi_seasonsofar_fn, - disease_type_arg=disease_short+'_seasontodate', + run_params_file_arg=run_config, + epi_input_csv_arg=epi_seasonincforecast_fn, # for seasonplusforecast + #epi_input_csv_arg=epi_filename+'_weekahead.csv', # for weekahead + disease_type_arg=disease_short+'_seasonincforecast', issue_date_arg=start_string, output_dir_arg=ews_plot_dir, wheat_sources_dir_arg=deposition_dir, wheat_source_disease_name_arg=deposition_disease_name, map_title_arg=map_title, chart_area_prefix=plotting_region_name_lower) - epi_processor_1.process() - - # prepare command for seasonplusforecast - - run_config = config['Epidemiology']['EWS-Plotting']['RunConfig_seasonplusforecast'] - - logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") - - epi_processor_2 = EPIPostPostProcessor() - epi_processor_2.set_param_config_files(sys_params_file_arg=sys_config, - chart_params_file_arg=chart_config, - run_params_file_arg=run_config, - epi_input_csv_arg=epi_seasonincforecast_fn, # for seasonplusforecast - #epi_input_csv_arg=epi_filename+'_weekahead.csv', # for weekahead - disease_type_arg=disease_short+'_seasonincforecast', - issue_date_arg=start_string, - output_dir_arg=ews_plot_dir, - wheat_sources_dir_arg=deposition_dir, - wheat_source_disease_name_arg=deposition_disease_name, - map_title_arg=map_title, - chart_area_prefix=plotting_region_name_lower) - epi_processor_2.process() + epi_processor_2.process() - # check the output - EWSPlottingOutputDir = f"{ews_plot_dir}/images/" - # TODO: Make this smarter, connected to the results of EWSPlottingEPIBase.plot_epi() - EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}infection_{plotting_region_name_lower}_*{disease_short}*.png"] + # check the output + EWSPlottingOutputDir = f"{ews_plot_dir}/images/" + # TODO: Make this smarter, connected to the results of EWSPlottingEPIBase.plot_epi() + EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}infection_{plotting_region_name_lower}_*{disease_short}*.png"] - EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) + EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) - # check there is some output from EWS-plotting - if not EWSPlottingOutputGlobs: - logger.error('EWS-Plotting did not produce any output') - raise RuntimeError + # check there is some output from EWS-plotting + if not EWSPlottingOutputGlobs: + self.logger.error('EWS-Plotting did not produce any output') + raise RuntimeError - # provide to list for transfer - EWSPlottingOutputs = [item for EWSPlottingOutput in EWSPlottingOutputGlobs for item in glob(EWSPlottingOutput)] + # provide to list for transfer + EWSPlottingOutputs = [item for EWSPlottingOutput in EWSPlottingOutputGlobs for item in glob(EWSPlottingOutput)] - return EWSPlottingOutputs + return EWSPlottingOutputs diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py index 2f2e9975493745b8fc7b6ac841218d92a9b007bc..41777d0ffd93787bbb684499e06c70bf526acaa8 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/ProcessorSurveys.py @@ -29,6 +29,7 @@ from numpy import any as np_any from shutil import copyfile from pandas import read_csv, concat +from Processor import Processor from source_gen.clustering import run_case from ProcessorSurveysODK import get_ODK_form_as_csv @@ -43,401 +44,418 @@ from ProcessorUtils import ( add_filters_to_sublogger, ) -logger = logging.getLogger('Processor.Surveys') -add_filters_to_sublogger(logger) -GET_FORM_AS_CSV_DICT = { - 'ODK' : get_ODK_form_as_csv, - 'kobotoolbox' : get_kobotoolbox_form_as_csv, - 'WRSIS' : get_WRSIS_form_as_csv, - 'WRT' : get_WRT_form_as_csv, - 'ODKv2' : get_ODKv2_form_as_csv, - 'newODK' : get_newODK_form_as_csv, -} +class ProcessorSurveys(Processor): -def process_pre_job_survey(input_args): - '''Returns a boolean as to whether the job is ready for full processing.''' - logger.info('started process_pre_job_survey(), nothing to do') + def process_pre_job(self, args): + return self.process_pre_job_survey(args) - return True -def process_in_job_survey(jobPath,status,config,component): - logger.info('started process_in_job_survey()') + def process_in_job(self, jobPath, status, configjson, component) -> object: + return self.process_in_job_survey(jobPath, status, configjson, component) - logger.debug('Performing download(s) from ODK server') - credentials_filename = config['Survey']['ServerCredentialsFile'] - with open(credentials_filename) as credentials_file: + def process_post_job(self, jobPath, configjson): + return self.process_EWS_plotting_survey(jobPath, configjson) - cred: dict = json.load(credentials_file) - assert 'forms' in cred.keys() + def __init__(self) -> None: + super().__init__() + logger = logging.getLogger('Processor.Surveys') + add_filters_to_sublogger(logger) - csv_filenames = {} - for form in cred['forms']: + self.GET_FORM_AS_CSV_DICT = { + 'ODK': get_ODK_form_as_csv, + 'kobotoolbox': get_kobotoolbox_form_as_csv, + 'WRSIS': get_WRSIS_form_as_csv, + 'WRT': get_WRT_form_as_csv, + 'ODKv2': get_ODKv2_form_as_csv, + 'newODK': get_newODK_form_as_csv, + } - logger.debug(f"Starting to download {form['form_id']}") + def process_pre_job_survey(self, input_args): + '''Returns a boolean as to whether the job is ready for full processing.''' + self.logger.info('started process_pre_job_survey(), nothing to do') - assert form['type'] in GET_FORM_AS_CSV_DICT + return True - func_get_form_as_csv = GET_FORM_AS_CSV_DICT[form['type']] + def process_in_job_survey(self, jobPath,status,config,component): + self.logger.info('started process_in_job_survey()') - csv_filename = func_get_form_as_csv(form, jobPath, config, status) + self.logger.debug('Performing download(s) from ODK server') - csv_filenames[form['form_id']] = csv_filename + credentials_filename = config['Survey']['ServerCredentialsFile'] + with open(credentials_filename) as credentials_file: - # load each file of surveys as a dataframe - forms = {} - for form_name,form_fn in csv_filenames.items(): + cred: dict = json.load(credentials_file) - # some define column types, hardwired for now - col_types = {'comment':'str','KEY':'str'} + assert 'forms' in cred.keys() - form_df = read_csv(form_fn,dtype=col_types) + csv_filenames = {} + for form in cred['forms']: - forms[form_name] = form_df + self.logger.debug(f"Starting to download {form['form_id']}") - # create some standard dataframe modification functions - def add_column(df,coln,value): - df[coln]=value - return + assert form['type'] in self.GET_FORM_AS_CSV_DICT - def remove_column(df,coln,value): - del df[coln] - return + func_get_form_as_csv = self.GET_FORM_AS_CSV_DICT[form['type']] - def replace_column(df,coln,value): - df[coln]=value - return + csv_filename = func_get_form_as_csv(form, jobPath, config, status) - def filter_by_column(df,coln,value): - # CAUTION: This requires surveyor to provide the correct country - df.drop(df.loc[df[coln]!=value].index,inplace=True) - #TODO : for Kenya data, provide a coordinate-based filter - return + csv_filenames[form['form_id']] = csv_filename - def filter_by_list(df,coln,values): - # CAUTION: This requires surveyor to provide the correct list of countries - df.drop(df.loc[~df[coln].isin(values)].index,inplace=True) - return + # load each file of surveys as a dataframe + forms = {} + for form_name,form_fn in csv_filenames.items(): - func_types = { - 'add': add_column, - 'remove' : remove_column, - 'replace' : replace_column, - 'filter' : filter_by_column, - 'filter_by_list' : filter_by_list - } + # some define column types, hardwired for now + col_types = {'comment':'str','KEY':'str'} - # simple format alignment using edits on config - # (should this need to be much more sophisticated, reconsider the workflow) - if 'FormEdits' in config['Survey']: + form_df = read_csv(form_fn,dtype=col_types) - form_edits = config['Survey']['FormEdits'] + forms[form_name] = form_df - # loop over each form - for form_name, edits in form_edits.items(): + # create some standard dataframe modification functions + def add_column(df,coln,value): + df[coln]=value + return - form_df = forms[form_name] + def remove_column(df,coln,value): + del df[coln] + return - # loop over each type of edit - for func_type, columns in edits.items(): + def replace_column(df,coln,value): + df[coln]=value + return - # check the function is available - assert func_type in func_types + def filter_by_column(df,coln,value): + # CAUTION: This requires surveyor to provide the correct country + df.drop(df.loc[df[coln]!=value].index,inplace=True) + #TODO : for Kenya data, provide a coordinate-based filter + return - # loop over each column to modify - for coln,val in columns.items(): + def filter_by_list(df,coln,values): + # CAUTION: This requires surveyor to provide the correct list of countries + df.drop(df.loc[~df[coln].isin(values)].index,inplace=True) + return - # apply the edit - func_types[func_type](form_df,coln,val) + func_types = { + 'add': add_column, + 'remove' : remove_column, + 'replace' : replace_column, + 'filter' : filter_by_column, + 'filter_by_list' : filter_by_list + } - # Merge additional SurveyData files and rearrange columns to be consistent - # Assumes that the same columns are present in all forms - # and that the first form is the standard + # simple format alignment using edits on config + # (should this need to be much more sophisticated, reconsider the workflow) + if 'FormEdits' in config['Survey']: - first=True - for dfi in forms.values(): + form_edits = config['Survey']['FormEdits'] - if first: - standard_columns = dfi.columns.tolist() - dfm = dfi + # loop over each form + for form_name, edits in form_edits.items(): - logger.debug(f"First processed form contains {dfm.shape[0]} records") + form_df = forms[form_name] - first=False - continue + # loop over each type of edit + for func_type, columns in edits.items(): - # re-order columns to match first case (presumed standard format) - # and fill missing columns with empty strings - dfi = dfi.reindex(standard_columns,fill_value='',axis='columns') + # check the function is available + assert func_type in func_types - logger.debug(f"Next processed form contains {dfi.shape[0]} records") + # loop over each column to modify + for coln,val in columns.items(): - dfm = concat([dfm,dfi],axis='rows') + # apply the edit + func_types[func_type](form_df,coln,val) - # save the result - Export_csv_path = f"{jobPath}/ExportCSV/" - Path(Export_csv_path).mkdir(parents = True, exist_ok = True) - forms_fn = f"{Export_csv_path}/Merged_SurveyData.csv" - dfm.to_csv(forms_fn,index=False,quoting=csv.QUOTE_MINIMAL) + # Merge additional SurveyData files and rearrange columns to be consistent + # Assumes that the same columns are present in all forms + # and that the first form is the standard - logger.debug(f"Preparing to apply removals and additions to survey data") + first=True + for dfi in forms.values(): - processed_surveys_filepath = f"{Export_csv_path}/Processed_SurveyData.csv" + if first: + standard_columns = dfi.columns.tolist() + dfm = dfi - survey_errors_to_remove_filepath = f"{config['ResourcesPath']}/coordinator/assets/SURVEYDATA_MANUAL/SurveyDataErrorsToRemove.csv" - survey_additions_filepath = f"{config['ResourcesPath']}/coordinator/assets/SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv" - - # perform here in python, using the 'KEY' column - # check the key column is unique, if not raise a warning and remove duplicates - - if dfm['KEY'].unique().size != dfm['KEY'].size: - status.reset('WARNING') - logger.warning(f"KEY column is not unique, removing duplicates") - # count the number of duplicates - n_duplicates = dfm.shape[0] - dfm['KEY'].unique().size - # drop the duplicates - dfm = dfm.drop_duplicates(keep='first') - logger.warning(f"Removed {n_duplicates} duplicates") + self.logger.debug(f"First processed form contains {dfm.shape[0]} records") - df_rm = read_csv(survey_errors_to_remove_filepath,dtype='str') - keys_to_rm = df_rm['KEY'] + first=False + continue - # check that all of the keys to remove exist in the original data - rm_keys_found = df_rm['KEY'].isin(dfm['KEY']) - n_rm_keys_found = rm_keys_found.sum() - n_rm_keys = rm_keys_found.size - if not np_all(rm_keys_found): - # this might happen if the run date is in the past - logger.warning(f"Only found {n_rm_keys_found} of {n_rm_keys} survey errors to remove") + # re-order columns to match first case (presumed standard format) + # and fill missing columns with empty strings + dfi = dfi.reindex(standard_columns,fill_value='',axis='columns') - rm_keys_not_found = df_rm[~rm_keys_found] - logger.debug(f"Erroneous entries not found are:\n{rm_keys_not_found}") + self.logger.debug(f"Next processed form contains {dfi.shape[0]} records") - logger.debug(f"Type of keys that can be found include:\n{dfm['KEY'].dtype}") + dfm = concat([dfm,dfi],axis='rows') - dfm_short_keys = [val for val in dfm['KEY'].values if len(str(val)) <10] - logger.debug(f"Keys that can be found include:\n{dfm_short_keys}") + # save the result + Export_csv_path = f"{jobPath}/ExportCSV/" + Path(Export_csv_path).mkdir(parents = True, exist_ok = True) + forms_fn = f"{Export_csv_path}/Merged_SurveyData.csv" + dfm.to_csv(forms_fn,index=False,quoting=csv.QUOTE_MINIMAL) - # identify which surveys to remove - idx_to_rm = dfm['KEY'].apply(lambda cell: cell in keys_to_rm.values) + self.logger.debug(f"Preparing to apply removals and additions to survey data") - #drop them in-place - dfm = dfm[~idx_to_rm] - logger.info(f"Removed {n_rm_keys_found} erroneous surveys") + processed_surveys_filepath = f"{Export_csv_path}/Processed_SurveyData.csv" - # add the extra entries - df_add = read_csv(survey_additions_filepath,dtype='str') - n_add_keys = df_add.shape[0] - df_join = concat([dfm,df_add]) - assert dfm.shape[0]+df_add.shape[0] == df_join.shape[0], 'Unexpected result of including additional surveys' + survey_errors_to_remove_filepath = f"{config['ResourcesPath']}/coordinator/assets/SURVEYDATA_MANUAL/SurveyDataErrorsToRemove.csv" + survey_additions_filepath = f"{config['ResourcesPath']}/coordinator/assets/SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv" - logger.info(f"Added {n_add_keys} additional surveys") + # perform here in python, using the 'KEY' column + # check the key column is unique, if not raise a warning and remove duplicates - # save as processed - df_join.to_csv(processed_surveys_filepath,index=False,quoting=csv.QUOTE_MINIMAL) + if dfm['KEY'].unique().size != dfm['KEY'].size: + status.reset('WARNING') + self.logger.warning(f"KEY column is not unique, removing duplicates") + # count the number of duplicates + n_duplicates = dfm.shape[0] - dfm['KEY'].unique().size + # drop the duplicates + dfm = dfm.drop_duplicates(keep='first') + self.logger.warning(f"Removed {n_duplicates} duplicates") - logger.debug('Preparing clustering calculation') + df_rm = read_csv(survey_errors_to_remove_filepath,dtype='str') + keys_to_rm = df_rm['KEY'] - date = datetime.datetime.now() + # check that all of the keys to remove exist in the original data + rm_keys_found = df_rm['KEY'].isin(dfm['KEY']) + n_rm_keys_found = rm_keys_found.sum() + n_rm_keys = rm_keys_found.size + if not np_all(rm_keys_found): + # this might happen if the run date is in the past + self.logger.warning(f"Only found {n_rm_keys_found} of {n_rm_keys} survey errors to remove") - # prepare environment for clustering calc - call_R = False + rm_keys_not_found = df_rm[~rm_keys_found] + self.logger.debug(f"Erroneous entries not found are:\n{rm_keys_not_found}") - upload_directory = f"{jobPath}/upload" - Path(upload_directory).mkdir(parents=True, exist_ok=True) + self.logger.debug(f"Type of keys that can be found include:\n{dfm['KEY'].dtype}") - if call_R: + dfm_short_keys = [val for val in dfm['KEY'].values if len(str(val)) <10] + self.logger.debug(f"Keys that can be found include:\n{dfm_short_keys}") - cluster_calc_path = "/storage/app/EWS_prod/code/wheat_source_generation/" + # identify which surveys to remove + idx_to_rm = dfm['KEY'].apply(lambda cell: cell in keys_to_rm.values) - # clear old output - old_clustering_output_glob = f"{cluster_calc_path}/output/sources_*" - old_clustering_outputs = glob(old_clustering_output_glob) + #drop them in-place + dfm = dfm[~idx_to_rm] + self.logger.info(f"Removed {n_rm_keys_found} erroneous surveys") - logger.info('About to unlink old output from clustering calculation') - for path in old_clustering_outputs: - logger.info(f"unlinking {path}") - Path(path).unlink() + # add the extra entries + df_add = read_csv(survey_additions_filepath,dtype='str') + n_add_keys = df_add.shape[0] + df_join = concat([dfm,df_add]) + assert dfm.shape[0]+df_add.shape[0] == df_join.shape[0], 'Unexpected result of including additional surveys' + self.logger.info(f"Added {n_add_keys} additional surveys") - RPath = '/usr/local/R/bin/Rscript' + # save as processed + df_join.to_csv(processed_surveys_filepath,index=False,quoting=csv.QUOTE_MINIMAL) - clustering_script = f"{cluster_calc_path}/code/R/clustering.R" + self.logger.debug('Preparing clustering calculation') - clustering_env = { - **os.environ, - 'R_LIBS':'/home/ewsmanager/R-packages-EWS-clustering/x86_64-pc-linux-gnu-library/3.5', - 'PROJ_LIB' : '/usr/share/proj/', # conda env breaks the automatic assignment of PROJ_LIB - } + date = datetime.datetime.now() - clustering_config = config['Survey']['SourcesConfigFilename'] - assert os.path.isfile(clustering_config) + # prepare environment for clustering calc + call_R = False - clustering_calc = [RPath, - '--no-init-file', - clustering_script, - processed_surveys_filepath, - config['StartString'], - '-2', - '7', - config['Survey']['SourcesConfigFilename']] + upload_directory = f"{jobPath}/upload" + Path(upload_directory).mkdir(parents=True, exist_ok=True) - logger.debug('Performing clustering calculation') + if call_R: - description_short = 'wheat-source-generation' - description_long = 'source calculation on processed surveys' + cluster_calc_path = "/storage/app/EWS_prod/code/wheat_source_generation/" - try: - subprocess_and_log(clustering_calc, description_short, description_long, env=clustering_env) - except: - status.reset('ERROR') - endJob(status,premature=True) + # clear old output + old_clustering_output_glob = f"{cluster_calc_path}/output/sources_*" + old_clustering_outputs = glob(old_clustering_output_glob) - logger.debug('Checking output of clustering calculation') + self.logger.info('About to unlink old output from clustering calculation') + for path in old_clustering_outputs: + self.logger.info(f"unlinking {path}") + Path(path).unlink() - try: - logger.debug('Trying to copy the dataset processed for clustering') - clustering_proc_path_glob = f"{cluster_calc_path}/output/survey_data_processed_{config['Survey']['SourcesRegionName']}_{date.strftime('%Y-%m-%d')}_*.csv" - clustering_proc_path_list = glob(clustering_proc_path_glob) - if len(clustering_proc_path_list) == 0: - logger.debug(f"No processed files produced from clustering in {clustering_proc_path_glob}") - raise Exception + RPath = '/usr/local/R/bin/Rscript' - elif len(clustering_proc_path_list) > 1: - logger.debug(f"Multiple processed files produced from clustering in {clustering_proc_path_glob}") - raise Exception + clustering_script = f"{cluster_calc_path}/code/R/clustering.R" - else: - logger.debug('Found 1 processed file, placing copy of result in job directory') + clustering_env = { + **os.environ, + 'R_LIBS':'/home/ewsmanager/R-packages-EWS-clustering/x86_64-pc-linux-gnu-library/3.5', + 'PROJ_LIB' : '/usr/share/proj/', # conda env breaks the automatic assignment of PROJ_LIB + } - proc_filename = f"survey_data_processed_{config['StartString']}.csv" - proc_path = f"{output_directory}/{proc_filename}" + clustering_config = config['Survey']['SourcesConfigFilename'] + assert os.path.isfile(clustering_config) - logger.debug(f"as {proc_path}") + clustering_calc = [RPath, + '--no-init-file', + clustering_script, + processed_surveys_filepath, + config['StartString'], + '-2', + '7', + config['Survey']['SourcesConfigFilename']] - copyfile(clustering_proc_path_list[0], proc_path) + self.logger.debug('Performing clustering calculation') - except: - logger.debug('Failed to get a copy of the dataset processed for clustering') + description_short = 'wheat-source-generation' + description_long = 'source calculation on processed surveys' - clustering_output_path_glob = f"{cluster_calc_path}/output/sources_{config['Survey']['SourcesRegionName']}_{date.strftime('%Y-%m-%d')}_*.csv" - clustering_output_path_list = glob(clustering_output_path_glob) - if len(clustering_output_path_list) == 0: - logger.error(f"No output produced from clustering in {clustering_output_path_glob}") - status.reset('ERROR') - endJob(status,premature=True) - if len(clustering_output_path_list) > 1: - logger.error(f"Multiple outputs produced from clustering in {clustering_output_path_glob}") - status.reset('ERROR') - endJob(status,premature=True) + try: + subprocess_and_log(clustering_calc, description_short, description_long, env=clustering_env) + except: + status.reset('ERROR') + endJob(status,premature=True) - sources_path = clustering_output_path_list[0] + self.logger.debug('Checking output of clustering calculation') - elif 'Groups' in config['Survey']: - # if 'Groups' is defined in the config, create grouped survey files and run python version - - logger.debug('Preparing grouped survey files') - group_directory = f"{jobPath}/Groups" - Path(group_directory).mkdir(parents=True, exist_ok=True) + try: + self.logger.debug('Trying to copy the dataset processed for clustering') - origins_list = df_join["Origin"].unique() - groups = {i:[i] for i in origins_list} + clustering_proc_path_glob = f"{cluster_calc_path}/output/survey_data_processed_{config['Survey']['SourcesRegionName']}_{date.strftime('%Y-%m-%d')}_*.csv" + clustering_proc_path_list = glob(clustering_proc_path_glob) + if len(clustering_proc_path_list) == 0: + self.logger.debug(f"No processed files produced from clustering in {clustering_proc_path_glob}") + raise Exception - assert not np_any([k in origins_list for k in config['Survey']['Groups'].keys()]) + elif len(clustering_proc_path_list) > 1: + self.logger.debug(f"Multiple processed files produced from clustering in {clustering_proc_path_glob}") + raise Exception - groups.update(config['Survey']['Groups']) - - # remove groups that are listed in GroupsToIgnore - if 'GroupsToIgnore' in config['Survey']: - for group_name in config['Survey']['GroupsToIgnore']: - if group_name in groups: - logger.info(f"Removing group {group_name} from list of groups") - del groups[group_name] - - for group_name,group_content in groups.items(): + else: + self.logger.debug('Found 1 processed file, placing copy of result in job directory') - logger.info(f"Creating survey group {group_name} which includes {group_content}") - - df_group = df_join.loc[df_join["Origin"].isin(group_content)] - - group_surveys_filename = f"surveys_{group_name}.csv" - group_surveys_filepath = f"{group_directory}/{group_surveys_filename}" - - df_group.to_csv(group_surveys_filepath, index=False, quoting=csv.QUOTE_MINIMAL) - - output_directory = f"{jobPath}/source_gen/{group_name}" + proc_filename = f"survey_data_processed_{config['StartString']}.csv" + proc_path = f"{output_directory}/{proc_filename}" + + self.logger.debug(f"as {proc_path}") + + copyfile(clustering_proc_path_list[0], proc_path) + + except: + self.logger.debug('Failed to get a copy of the dataset processed for clustering') + + clustering_output_path_glob = f"{cluster_calc_path}/output/sources_{config['Survey']['SourcesRegionName']}_{date.strftime('%Y-%m-%d')}_*.csv" + clustering_output_path_list = glob(clustering_output_path_glob) + if len(clustering_output_path_list) == 0: + self.logger.error(f"No output produced from clustering in {clustering_output_path_glob}") + status.reset('ERROR') + endJob(status,premature=True) + if len(clustering_output_path_list) > 1: + self.logger.error(f"Multiple outputs produced from clustering in {clustering_output_path_glob}") + status.reset('ERROR') + endJob(status,premature=True) + + sources_path = clustering_output_path_list[0] + + elif 'Groups' in config['Survey']: + # if 'Groups' is defined in the config, create grouped survey files and run python version + + self.logger.debug('Preparing grouped survey files') + group_directory = f"{jobPath}/Groups" + Path(group_directory).mkdir(parents=True, exist_ok=True) + + origins_list = df_join["Origin"].unique() + groups = {i:[i] for i in origins_list} + + assert not np_any([k in origins_list for k in config['Survey']['Groups'].keys()]) + + groups.update(config['Survey']['Groups']) + + # remove groups that are listed in GroupsToIgnore + if 'GroupsToIgnore' in config['Survey']: + for group_name in config['Survey']['GroupsToIgnore']: + if group_name in groups: + self.logger.info(f"Removing group {group_name} from list of groups") + del groups[group_name] + + for group_name,group_content in groups.items(): + + self.logger.info(f"Creating survey group {group_name} which includes {group_content}") + + df_group = df_join.loc[df_join["Origin"].isin(group_content)] + + group_surveys_filename = f"surveys_{group_name}.csv" + group_surveys_filepath = f"{group_directory}/{group_surveys_filename}" + + df_group.to_csv(group_surveys_filepath, index=False, quoting=csv.QUOTE_MINIMAL) + + output_directory = f"{jobPath}/source_gen/{group_name}" + Path(output_directory).mkdir(parents=True, exist_ok=True) + + sources_path = run_case( + config_path = config['Survey']['pySourcesConfigFilename'], + survey_path = group_surveys_filepath, + reference_date = config['StartString'], + # Day 0 (current day) is always included + # Days -2,-1 and 0 are needed to initialise spores in NAME + # days 1-7 are forecast days + # days 8 and 9 are buffers in case NAME needs to run with the + # previous day's source job + day_offsets = [-2,9], + output_dir = output_directory) + + self.logger.debug('Placing copy of result in job directory with conventional name') + + output_filename = f"sources_{group_name}_{config['StartString']}.csv" + output_path = f"{jobPath}/upload/{output_filename}" + + self.logger.debug(f"as {output_path}") + + copyfile(sources_path, output_path) + else: + # run python version without grouping surveys + + output_directory = f"{jobPath}/source_gen" Path(output_directory).mkdir(parents=True, exist_ok=True) sources_path = run_case( config_path = config['Survey']['pySourcesConfigFilename'], - survey_path = group_surveys_filepath, + survey_path = processed_surveys_filepath, reference_date = config['StartString'], # Day 0 (current day) is always included # Days -2,-1 and 0 are needed to initialise spores in NAME # days 1-7 are forecast days - # days 8 and 9 are buffers in case NAME needs to run with the + # days 8 and 9 are buffers in case NAME needs to run with the # previous day's source job day_offsets = [-2,9], output_dir = output_directory) - - logger.debug('Placing copy of result in job directory with conventional name') - - output_filename = f"sources_{group_name}_{config['StartString']}.csv" - output_path = f"{jobPath}/upload/{output_filename}" - logger.debug(f"as {output_path}") + self.logger.debug('Placing copy of result in job directory with conventional name') - copyfile(sources_path, output_path) - else: - # run python version without grouping surveys - - output_directory = f"{jobPath}/source_gen" - Path(output_directory).mkdir(parents=True, exist_ok=True) - - sources_path = run_case( - config_path = config['Survey']['pySourcesConfigFilename'], - survey_path = processed_surveys_filepath, - reference_date = config['StartString'], - # Day 0 (current day) is always included - # Days -2,-1 and 0 are needed to initialise spores in NAME - # days 1-7 are forecast days - # days 8 and 9 are buffers in case NAME needs to run with the - # previous day's source job - day_offsets = [-2,9], - output_dir = output_directory) - - logger.debug('Placing copy of result in job directory with conventional name') - - output_filename = f"sources_{config['StartString']}.csv" - output_path = f"{jobPath}/upload/{output_filename}" + output_filename = f"sources_{config['StartString']}.csv" + output_path = f"{jobPath}/upload/{output_filename}" - logger.debug(f"as {output_path}") + self.logger.debug(f"as {output_path}") - copyfile(sources_path, output_path) + copyfile(sources_path, output_path) - upload_filenames = f"sources_*{config['StartString']}.csv" - upload_path = f"{jobPath}/upload/{upload_filenames}" + upload_filenames = f"sources_*{config['StartString']}.csv" + upload_path = f"{jobPath}/upload/{upload_filenames}" - # glob list of output files - upload_path_list = glob(upload_path) + # glob list of output files + upload_path_list = glob(upload_path) - proc_out = {} - # Output files available for upload - proc_out['output'] = upload_path_list - # Processing files available for clearing - proc_out['clearup'] = None + proc_out = {} + # Output files available for upload + proc_out['output'] = upload_path_list + # Processing files available for clearing + proc_out['clearup'] = None - return proc_out + return proc_out -#TODO -def process_EWS_plotting_survey(jobPath,config): - '''Returns a list of output files for transfer.''' + #TODO + def process_EWS_plotting_survey(self, jobPath,config): + '''Returns a list of output files for transfer.''' - logger.info('started process_EWS_plotting_survey(), nothing to do') + self.logger.info('started process_EWS_plotting_survey(), nothing to do') - pass - return [] + pass + return [] diff --git a/tests/integration/partial/integration_test_utils.py b/tests/integration/partial/integration_test_utils.py index 14c5e7ee977ab65ad11566c74e02ff7cb8a79ce6..c52d32f060f2925f8512962bb9d607386765382e 100644 --- a/tests/integration/partial/integration_test_utils.py +++ b/tests/integration/partial/integration_test_utils.py @@ -4,7 +4,7 @@ import glob import json import os from importlib import reload -from typing import List +from typing import List, Type from unittest import TestSuite, TestLoader, TestCase, TestResult from zipfile import ZipFile @@ -169,22 +169,20 @@ class IntegrationTestUtils: @staticmethod def run_partial_integration_test_pipeline(component: str, start_date: str, + processor: Processor, **kwargs): """ Runs the "run_Process" function in Processor.py with the given arguments for the partial integration tests. The full integration pipeline is run in the "run_full_integration_test_pipeline" function. + :param processor: :param component: :param start_date: :param kwargs: :return: """ - # need EMAIL_CRED in the environment before we import Processor - os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH - processor = ProcessorDeposition() - args_dict: dict = {} # note, possible to override these values in the kwargs loop below diff --git a/tests/integration/partial/test_advisory.py b/tests/integration/partial/test_advisory.py index c38bd377c16d87585472a068a7a9ba0d0aec526f..d8558cd7459895a0e0508f9fbc671c9225b5861c 100644 --- a/tests/integration/partial/test_advisory.py +++ b/tests/integration/partial/test_advisory.py @@ -2,6 +2,7 @@ import copy import os import unittest +from ProcessorAdvisory import ProcessorAdvisory from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.advisory_test_suite import BaseAdvisoryTestSuite @@ -68,8 +69,12 @@ class TestAdvisory(BaseAdvisoryTestSuite.AdvisoryTestSuite): @staticmethod def run_advisory_pipeline(): component = 'Advisory' - IntegrationTestUtils.run_partial_integration_test_pipeline(component, IntegrationTestUtils.TEST_START_DATE) - + # need EMAIL_CRED in the environment before we create a Processor + os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH + advisory_processor = ProcessorAdvisory() + IntegrationTestUtils.run_partial_integration_test_pipeline(component, + IntegrationTestUtils.TEST_START_DATE, + advisory_processor) if __name__ == '__main__': unittest.main() diff --git a/tests/integration/partial/test_deposition.py b/tests/integration/partial/test_deposition.py index ace57114737eacac6385602e4f0976b341d34997..96268cb9510e7a6d5fc71c4f0474a43b622d139f 100644 --- a/tests/integration/partial/test_deposition.py +++ b/tests/integration/partial/test_deposition.py @@ -2,6 +2,7 @@ import copy import os import unittest +from ProcessorDeposition import ProcessorDeposition from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.depo_test_suite import BaseDepoTestSuite @@ -63,7 +64,12 @@ class TestDeposition(BaseDepoTestSuite.DepoTestSuite): @staticmethod def run_depo_pipeline(): component = 'Deposition' - IntegrationTestUtils.run_partial_integration_test_pipeline(component, IntegrationTestUtils.TEST_START_DATE) + # need EMAIL_CRED in the environment before we create a Processor + os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH + deposition_processor = ProcessorDeposition() + IntegrationTestUtils.run_partial_integration_test_pipeline(component, + IntegrationTestUtils.TEST_START_DATE, + deposition_processor) if __name__ == '__main__': diff --git a/tests/integration/partial/test_env_suit.py b/tests/integration/partial/test_env_suit.py index 4e41acc718f60cf76c5fd0faee9cabe71bbda98d..11169a6601406b1bb4c8a193dd9f2ce72097f47f 100644 --- a/tests/integration/partial/test_env_suit.py +++ b/tests/integration/partial/test_env_suit.py @@ -2,6 +2,7 @@ import copy import os import unittest +from ProcessorEnvironment import ProcessorEnvironment from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.env_suit_test_suite import BaseEnvSuitTestSuite @@ -41,8 +42,8 @@ class TestEnvSuit(BaseEnvSuitTestSuite.EnvSuitTestSuite): @staticmethod def write_temp_run_config_file(): nowstring: str = IntegrationTestUtils.get_now_string() - # prefix: str = "temp_env_" + nowstring - prefix: str = "temp_env" + prefix: str = "temp_env_" + nowstring + #prefix: str = "temp_env" default_config = IntegrationTestUtils.DEFAULT_CONFIG_FILE_PATH default_config_dict: dict = IntegrationTestUtils.load_json_file(default_config) @@ -66,7 +67,12 @@ class TestEnvSuit(BaseEnvSuitTestSuite.EnvSuitTestSuite): @staticmethod def run_env_pipeline(): component = 'Environment' - IntegrationTestUtils.run_partial_integration_test_pipeline(component, IntegrationTestUtils.TEST_START_DATE) + # need EMAIL_CRED in the environment before we create a Processor + os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH + environment_processor = ProcessorEnvironment() + IntegrationTestUtils.run_partial_integration_test_pipeline(component, + IntegrationTestUtils.TEST_START_DATE, + environment_processor) if __name__ == '__main__': diff --git a/tests/integration/partial/test_epi.py b/tests/integration/partial/test_epi.py index 6f1d85ed3f69ac66a194d37b3680e31f1bff6c99..af7fb9eb307cf9fc9fa60d94402086383dffd366 100644 --- a/tests/integration/partial/test_epi.py +++ b/tests/integration/partial/test_epi.py @@ -2,6 +2,7 @@ import copy import os import unittest +from ProcessorEpidemiology import ProcessorEpidemiology from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.epi_test_suite import BaseEpiTestSuite @@ -67,7 +68,12 @@ class TestEpi(BaseEpiTestSuite.EpiTestSuite): @staticmethod def run_epi_pipeline(): component = 'Epidemiology' - IntegrationTestUtils.run_partial_integration_test_pipeline(component, IntegrationTestUtils.TEST_START_DATE) + # need EMAIL_CRED in the environment before we create a Processor + os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH + epi_processor = ProcessorEpidemiology() + IntegrationTestUtils.run_partial_integration_test_pipeline(component, + IntegrationTestUtils.TEST_START_DATE, + epi_processor) if __name__ == '__main__': diff --git a/tests/integration/partial/test_survey.py b/tests/integration/partial/test_survey.py index a770fed939f046e069d6fe97165dcf7ad601dfdd..0026f5206ed6771858d74b04ef95f4888b1fda53 100644 --- a/tests/integration/partial/test_survey.py +++ b/tests/integration/partial/test_survey.py @@ -2,6 +2,7 @@ import copy import os import unittest +from ProcessorSurveys import ProcessorSurveys from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.survey_test_suite import BaseSurveyTestSuite @@ -64,7 +65,12 @@ class TestSurvey(BaseSurveyTestSuite.SurveyTestSuite): @staticmethod def run_survey_pipeline(): component = 'Survey' - IntegrationTestUtils.run_partial_integration_test_pipeline(component, IntegrationTestUtils.TEST_START_DATE) + # need EMAIL_CRED in the environment before we create a Processor + os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH + survey_processor = ProcessorSurveys() + IntegrationTestUtils.run_partial_integration_test_pipeline(component, + IntegrationTestUtils.TEST_START_DATE, + survey_processor) if __name__ == '__main__':