#ProcessorEnvironment.py '''Functions to process the environment component.''' from distutils.dir_util import copy_tree from glob import glob import logging from pathlib import Path import os from string import Template import tarfile import iris from iris.cube import CubeList from ews_postprocessing.environmental_suitability.env_suit_post_processor import EnvSuitPostProcessor from ews_postprocessing.utils.disease_info import EnvSuitDiseaseInfo import EnvSuitPipeline as esp from ProcessorUtils import ( get_only_existing_globs, subprocess_and_log, add_filters_to_sublogger, remove_path_from_tar_members, short_name ) logger = logging.getLogger('Processor.Environment') add_filters_to_sublogger(logger) def process_in_job_env2_0(jobPath,status,config,component): '''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.''' logger.info('started process_in_job_env2_0()') logger.info('Copying file from remote server to job directory') file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) file_name = Template(config[component]['InputFileTemplate']).substitute(**config) #TODO: check if file exists already (may be the case for multiple configs in one) # TODO: perform ssh file transfer in python instead of subprocess server_name: str = config['ServerName'] if server_name == "": cmd_scp: list = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath] else: cmd_scp: list = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", f"{config['ServerName']}:{file_path}/{file_name}.tar.gz", jobPath] description_short = 'env2 scp' description_long = 'Copying file from remote server to job directory' # lawrence comment in/out subprocess_and_log(cmd_scp,description_short, description_long) logger.info('untarring the input file') # untar incoming name data output_directory = f"{jobPath}/NAME_Met_as_netcdf" Path(output_directory).mkdir(parents=True, exist_ok=True) tarfile_name = f"{jobPath}/{file_name}.tar.gz" with tarfile.open(tarfile_name) as tar: members = remove_path_from_tar_members(tar) tar.extractall(output_directory, members = members) # basic check that contents are as expected for 7-day forecast (57 timepoints in all files) cube_wildcard = f"{output_directory}/*.nc" cubes: CubeList = iris.load(cube_wildcard) #TODO: find out from Will whether these problem cubes with unexpected time counts will be in the production output ignore_list = ["LAND_FRACTION", "TOPOGRAPHY"] for cube in cubes: var_name = cube.name() coord = cube.coord("time") timepoint_count = coord.shape[0] if timepoint_count != 57 and var_name not in ignore_list: msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}" logger.error(msg) raise RuntimeError(msg) region = config['RegionName'] logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear") pipeline_config = config["Environment"] try: #todo lawrence comment this back to original (extracted=False) esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False) except: logger.exception(f"Some failure when running EnvSuitPipeline.py") raise logger.info('Finished running environmental suitability 2.0') # TODO: Check that the output appears as expected proc_out = {} # Output files available for upload proc_out['output'] = None # Processing files available for clearing proc_out['clearup'] = [f"{jobPath}/{file_name}.tar.gz"] return proc_out def process_copy_past_job_env2_0(jobPath,status,config,component): '''For when we want to skip process_in_job() to test the other components of this script. Currently hard-wired.''' # TODO: remove this hard-wired assumption jobPath_to_copy = f"{jobPath}/../{short_name['Environment']}_{config['StartString']}_bak/" assert os.path.exists(jobPath_to_copy) dir_src = f"{jobPath_to_copy}/processed/" dir_dst = f"{jobPath}/processed/" logger.info(f"Copying from {dir_src}") logger.info(f"to {dir_dst}") copy_tree(dir_src,dir_dst) logger.info('Copying complete') proc_out = {} # Output files available for upload proc_out['output'] = None # Processing files available for clearing proc_out['clearup'] = None return proc_out '''class EWSPlottingEnvSuit(EWSPlottingEnvSuitBase): def set_custom_params(self, sys_params_dict: dict, chart_params_dict: dict, run_params_dict: dict, disease_csv_template_arg: str, diseases: List[EnvSuitDiseaseInfo]): # this is unique to the asia/east africa env suit, as we are not filtering within country boundaries run_params_dict[RUN_PARAMS.FILTER_FOR_COUNTRY_KEY] = "False"''' #TODO test if this works def process_EWS_plotting_env2_0(jobPath,config): '''Configures the plotting arguments and calls EWS-plotting as a python module. Returns a list of output files for transfer.''' logger.info('started process_EWS_plotting_env2_0()') main_region = config['RegionName'] input_dir = f"{jobPath}/processed/{main_region}" subregions = config['SubRegionNames'] EWSPlottingOutputGlobs = [] # work on each region for region in subregions: output_dir = f"{jobPath}/plotting/{region.lower()}" csv_template_dir = input_dir + "/{DISEASE_DIR}/RIE_value.csv" Path(output_dir).mkdir(parents=True, exist_ok=True) sys_config = config['Environment']['EWS-Plotting']['SysConfig'] run_config = config['Environment']['EWS-Plotting']['RunConfig'] chart_config = config['Environment']['EWS-Plotting'][region]['ChartConfig'] filter_for_country = config['Environment']['EWS-Plotting'][region]['FilterForCountry'] # Note that this runs all disease types available logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") env_suit_processor = EnvSuitPostProcessor() env_suit_processor.set_param_config_files(sys_params_file_arg = sys_config, chart_params_file_arg = chart_config, run_params_file_arg = run_config, es_output_dir_arg = output_dir, issue_date_arg = config['StartString'], disease_csv_template_arg = csv_template_dir) env_suit_processor.run_params.FILTER_FOR_COUNTRY = (filter_for_country.upper() == "TRUE") # Include further diseases in plotting. In this case the irrigated suitabilite for the rusts. # TODO: move this part out into a config extra_diseases = [ EnvSuitDiseaseInfo("Stem rust temp-only", "stem_rust_temponly", config['StartString'], "StemRust_TempOnly", csv_template_dir), EnvSuitDiseaseInfo("Leaf rust temp-only", "leaf_rust_temponly", config['StartString'], "LeafRust_TempOnly", csv_template_dir), EnvSuitDiseaseInfo("Stripe rust temp-only", "stripe_temponly", config['StartString'], "StripeRust_TempOnly", csv_template_dir), ] env_suit_processor.add_diseases(diseases=extra_diseases) env_suit_processor.process() # check the output EWSPlottingOutputDir = f"{output_dir}/images/" #EWSPlottingOutputGlobs += [ # # daily plots # f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png", # # weekly plots # f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"] EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"] # check the output EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) # check there is some output from EWS-plotting if not EWSPlottingOutputGlobs: logger.error('EWS-Plotting did not produce any output') raise RuntimeError # provide list for transfer EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)]) return EWSPlottingOutputs