Something went wrong on our end
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ProcessorEnvironment.py 10.00 KiB
#ProcessorEnvironment.py
'''Functions to process the environment component.'''
from distutils.dir_util import copy_tree
from glob import glob
import logging
from pathlib import Path
import os
from string import Template
import tarfile
import iris
from iris.cube import CubeList
from Processor import Processor
from ProcessorServer import process_pre_job_server_download
from ews_postprocessing.environmental_suitability.env_suit_post_processor import EnvSuitPostProcessor
from ews_postprocessing.utils.disease_info import EnvSuitDiseaseInfo
import EnvSuitPipeline as esp
from ProcessorUtils import (
get_only_existing_globs,
subprocess_and_log,
remove_path_from_tar_members,
short_name, add_filters_to_sublogger
)
class ProcessorEnvironment(Processor):
def process_pre_job(self, args):
return process_pre_job_server_download(args)
def process_in_job(self, jobPath, status, configjson, component) -> object:
return self.process_in_job_env2_0(jobPath, status, configjson, component)
def process_post_job(self, jobPath, configjson):
return self.process_EWS_plotting_env2_0(jobPath, configjson)
def __init__(self) -> None:
super().__init__()
logger = logging.getLogger('Processor.Environment')
add_filters_to_sublogger(logger)
def process_in_job_env2_0(self, jobPath,status,config,component):
'''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.'''
self.logger.info('started process_in_job_env2_0()')
self.logger.info('Copying file from remote server to job directory')
file_path = Template(config[component]['ServerPathTemplate']).substitute(**config)
file_name = Template(config[component]['InputFileTemplate']).substitute(**config)
#TODO: check if file exists already (may be the case for multiple configs in one)
# TODO: perform ssh file transfer in python instead of subprocess
server_name: str = config['ServerName']
if server_name == "":
cmd_scp: list = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath]
else:
cmd_scp: list = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no",
f"{config['ServerName']}:{file_path}/{file_name}.tar.gz", jobPath]
description_short = 'env2 scp'
description_long = 'Copying file from remote server to job directory'
# lawrence comment in/out
subprocess_and_log(cmd_scp,description_short, description_long)
self.logger.info('untarring the input file')
# untar incoming name data
output_directory = f"{jobPath}/NAME_Met_as_netcdf"
Path(output_directory).mkdir(parents=True, exist_ok=True)
tarfile_name = f"{jobPath}/{file_name}.tar.gz"
with tarfile.open(tarfile_name) as tar:
members = remove_path_from_tar_members(tar)
tar.extractall(output_directory, members = members)
# basic check that contents are as expected for 7-day forecast (57 timepoints in all files)
cube_wildcard = f"{output_directory}/*.nc"
cubes: CubeList = iris.load(cube_wildcard)
# land_fraction and topography will only have a single timepoint (as these dont change over time), so we can ignore
# these when sense-checking the expected number of timepoints
ignore_list = ["LAND_FRACTION", "TOPOGRAPHY"]
for cube in cubes:
var_name = cube.name()
coord = cube.coord("time")
timepoint_count = coord.shape[0]
if timepoint_count != 57 and var_name not in ignore_list:
msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}"
self.logger.error(msg)
raise RuntimeError(msg)
region = config['RegionName']
self.logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear")
pipeline_config = config["Environment"]
try:
#todo lawrence comment this back to original (extracted=False)
esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False)
except:
self.logger.exception(f"Some failure when running EnvSuitPipeline.py")
raise
self.logger.info('Finished running environmental suitability 2.0')
# TODO: Check that the output appears as expected
proc_out = {}
# Output files available for upload
proc_out['output'] = None
# Processing files available for clearing
proc_out['clearup'] = [f"{jobPath}/{file_name}.tar.gz"]
return proc_out
def process_copy_past_job_env2_0(self, jobPath,status,config,component):
'''For when we want to skip process_in_job() to test the other components of
this script. Currently hard-wired.'''
# TODO: remove this hard-wired assumption
jobPath_to_copy = f"{jobPath}/../{short_name['Environment']}_{config['StartString']}_bak/"
assert os.path.exists(jobPath_to_copy)
dir_src = f"{jobPath_to_copy}/processed/"
dir_dst = f"{jobPath}/processed/"
self.logger.info(f"Copying from {dir_src}")
self.logger.info(f"to {dir_dst}")
copy_tree(dir_src,dir_dst)
self.logger.info('Copying complete')
proc_out = {}
# Output files available for upload
proc_out['output'] = None
# Processing files available for clearing
proc_out['clearup'] = None
return proc_out
'''class EWSPlottingEnvSuit(EWSPlottingEnvSuitBase):
def set_custom_params(self,
sys_params_dict: dict,
chart_params_dict: dict,
run_params_dict: dict,
disease_csv_template_arg: str,
diseases: List[EnvSuitDiseaseInfo]):
# this is unique to the asia/east africa env suit, as we are not filtering within country boundaries
run_params_dict[RUN_PARAMS.FILTER_FOR_COUNTRY_KEY] = "False"'''
#TODO test if this works
def process_EWS_plotting_env2_0(self, jobPath,config):
'''Configures the plotting arguments and calls EWS-plotting as a python module.
Returns a list of output files for transfer.'''
self.logger.info('started process_EWS_plotting_env2_0()')
main_region = config['RegionName']
input_dir = f"{jobPath}/processed/{main_region}"
subregions = config['SubRegionNames']
EWSPlottingOutputGlobs = []
# work on each region
for region in subregions:
output_dir = f"{jobPath}/plotting/{region.lower()}"
csv_template_dir = input_dir + "/{DISEASE_DIR}/RIE_value.csv"
Path(output_dir).mkdir(parents=True, exist_ok=True)
sys_config = config['Environment']['EWS-Plotting']['SysConfig']
run_config = config['Environment']['EWS-Plotting']['RunConfig']
chart_config = config['Environment']['EWS-Plotting'][region]['ChartConfig']
filter_for_country = config['Environment']['EWS-Plotting'][region]['FilterForCountry']
# Note that this runs all disease types available
self.logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}")
env_suit_processor = EnvSuitPostProcessor()
env_suit_processor.set_param_config_files(sys_params_file_arg = sys_config,
chart_params_file_arg = chart_config,
run_params_file_arg = run_config,
es_output_dir_arg = output_dir,
issue_date_arg = config['StartString'],
disease_csv_template_arg = csv_template_dir)
env_suit_processor.run_params.FILTER_FOR_COUNTRY = (filter_for_country.upper() == "TRUE")
# Include further diseases in plotting. In this case the irrigated suitabilite for the rusts.
# TODO: move this part out into a config
extra_diseases = [
EnvSuitDiseaseInfo("Stem rust temp-only", "stem_rust_temponly", config['StartString'], "StemRust_TempOnly", csv_template_dir),
EnvSuitDiseaseInfo("Leaf rust temp-only", "leaf_rust_temponly", config['StartString'], "LeafRust_TempOnly", csv_template_dir),
EnvSuitDiseaseInfo("Stripe rust temp-only", "stripe_temponly", config['StartString'], "StripeRust_TempOnly", csv_template_dir),
]
env_suit_processor.add_diseases(diseases=extra_diseases)
env_suit_processor.process()
# check the output
EWSPlottingOutputDir = f"{output_dir}/images/"
#EWSPlottingOutputGlobs += [
# # daily plots
# f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png",
# # weekly plots
# f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"]
EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"]
# check the output
EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False)
# check there is some output from EWS-plotting
if not EWSPlottingOutputGlobs:
self.logger.error('EWS-Plotting did not produce any output')
raise RuntimeError
# provide list for transfer
EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)])
return EWSPlottingOutputs
if __name__ == '__main__':
processor = ProcessorEnvironment()
processor.run_processor("Environment")