FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
  • J.W. Smith's avatar
    bce461a1
    feat: Clearup functionality · bce461a1
    J.W. Smith authored
    Optional config argument to delete mid-process files at the end of a
    successful job. There is an associated command-line flag '--clearup'.
    
    The reason for this feature is to provide a way to slim down data
    volumes of production runs. A lot of processing files are generated
    and production runs may only need to retain configuration, logs and
    outputs. Also, the inputs might be archived separately, so they
    wouldn't need to be retained in prod runs.
    
    Currently, files are moved to a hard-coded 'recycling bin' directory,
    and there is a minimal set of files set for clearup: only the deposition
    and environment tar files.
    
    This change requires some standard structure to outputs of
    processor_in_job functions, which would be more robust if
    a class structure is implemented.
    
    TODO: Implement a test for file clearup.
    bce461a1
    History
    feat: Clearup functionality
    J.W. Smith authored
    Optional config argument to delete mid-process files at the end of a
    successful job. There is an associated command-line flag '--clearup'.
    
    The reason for this feature is to provide a way to slim down data
    volumes of production runs. A lot of processing files are generated
    and production runs may only need to retain configuration, logs and
    outputs. Also, the inputs might be archived separately, so they
    wouldn't need to be retained in prod runs.
    
    Currently, files are moved to a hard-coded 'recycling bin' directory,
    and there is a minimal set of files set for clearup: only the deposition
    and environment tar files.
    
    This change requires some standard structure to outputs of
    processor_in_job functions, which would be more robust if
    a class structure is implemented.
    
    TODO: Implement a test for file clearup.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ProcessorEnvironment.py 8.10 KiB
#ProcessorEnvironment.py
'''Functions to process the environment component.'''

from distutils.dir_util import copy_tree
from glob import glob
import logging
from pathlib import Path
import os
from string import Template
import tarfile

import iris
from iris.cube import CubeList

from ews_postprocessing.environmental_suitability.env_suit_post_processor import EnvSuitPostProcessor
from plotting.common.utils import EnvSuitDiseaseInfo

import EnvSuitPipeline as esp
from ProcessorUtils import (
        get_only_existing_globs,
        subprocess_and_log,
        add_filters_to_sublogger,
        remove_path_from_tar_members,
        short_name
)

logger = logging.getLogger('Processor.Environment')
add_filters_to_sublogger(logger)


def process_in_job_env2_0(jobPath,status,config,component):
    logger.info('started process_in_job_env2_0()')

    logger.info('Copying file from remote server to job directory')

    file_path = Template(config[component]['ServerPathTemplate']).substitute(**config)
    file_name = Template(config[component]['InputFileTemplate']).substitute(**config)

    #TODO: check if file exists already (may be the case for multiple configs in one)

    # TODO: perform ssh file transfer in python instead of subprocess
    server_name: str = config['ServerName']
    if server_name == "":
        cmd_scp: list = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath]
    else:
        cmd_scp: list = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no",
                         f"{config['ServerName']}:{file_path}/{file_name}.tar.gz", jobPath]

    description_short = 'env2 scp'
    description_long = 'Copying file from remote server to job directory'
    # lawrence comment in/out
    subprocess_and_log(cmd_scp,description_short, description_long)

    logger.info('untarring the input file')

    # untar incoming name data
    output_directory = f"{jobPath}/NAME_Met_as_netcdf"
    Path(output_directory).mkdir(parents=True, exist_ok=True)
    tarfile_name = f"{jobPath}/{file_name}.tar.gz"
    with tarfile.open(tarfile_name) as tar:
        members = remove_path_from_tar_members(tar)
        tar.extractall(output_directory, members = members)

    # basic check that contents are as expected for 7-day forecast (57 timepoints in all files)
    cube_wildcard = f"{output_directory}/*.nc"
    cubes: CubeList = iris.load(cube_wildcard)
    for cube in cubes:
        coord = cube.coord("time")
        timepoint_count = coord.shape[0]
        if timepoint_count != 57:
            msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}"
            logger.error(msg)
            raise RuntimeError(msg)

    region = config['RegionName']

    logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear")

    pipeline_config = config["Environment"]
    try:
        #todo lawrence comment this back to original (extracted=False)
        esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False)
    except:
        logger.exception(f"Some failure when running EnvSuitPipeline.py")
        raise

    logger.info('Finished running environmental suitability 2.0')

    # TODO: Check that the output appears as expected
    
    proc_out = {}
    # Output files available for upload
    proc_out['output'] = None
    # Processing files available for clearing
    proc_out['clearup'] = [f"{jobPath}/{file_name}.tar.gz"]

    return proc_out

def process_copy_past_job_env2_0(jobPath,status,config,component):
    '''For when we want to skip process_in_job() to test the other components of
    this script. Currently hard-wired.'''

    # TODO: remove this hard-wired assumption
    jobPath_to_copy = f"{jobPath}/../{short_name['Environment']}_{config['StartString']}_bak/"

    assert os.path.exists(jobPath_to_copy)

    dir_src = f"{jobPath_to_copy}/processed/"

    dir_dst = f"{jobPath}/processed/"

    logger.info(f"Copying from {dir_src}")

    logger.info(f"to {dir_dst}")

    copy_tree(dir_src,dir_dst)

    logger.info('Copying complete')

    proc_out = {}
    # Output files available for upload
    proc_out['output'] = None
    # Processing files available for clearing
    proc_out['clearup'] = None

    return proc_out

'''class EWSPlottingEnvSuit(EWSPlottingEnvSuitBase):

    def set_custom_params(self,
                          sys_params_dict: dict,
                          chart_params_dict: dict,
                          run_params_dict: dict,
                          disease_csv_template_arg: str,
                          diseases: List[EnvSuitDiseaseInfo]):
        # this is unique to the asia/east africa env suit, as we are not filtering within country boundaries
        run_params_dict[RUN_PARAMS.FILTER_FOR_COUNTRY_KEY] = "False"'''

#TODO test if this works
def process_EWS_plotting_env2_0(jobPath,config):
    '''Configures the plotting arguments and calls EWS-plotting as a python module.
    Returns a list of output files for transfer.'''

    logger.info('started process_EWS_plotting_env2_0()')

    main_region = config['RegionName']

    input_dir = f"{jobPath}/processed/{main_region}"

    subregions = config['SubRegionNames']

    EWSPlottingOutputGlobs = []

    # work on each region
    for region in subregions:

        output_dir = f"{jobPath}/plotting/{region.lower()}"
        csv_template_dir = input_dir + "/{DISEASE_DIR}/RIE_value.csv"

        Path(output_dir).mkdir(parents=True, exist_ok=True)

        sys_config = config['Environment']['EWS-Plotting']['SysConfig']
        run_config = config['Environment']['EWS-Plotting']['RunConfig']
        chart_config = config['Environment']['EWS-Plotting'][region]['ChartConfig']
        filter_for_country = config['Environment']['EWS-Plotting'][region]['FilterForCountry']

        # Note that this runs all disease types available

        logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}")

        env_suit_processor = EnvSuitPostProcessor()
        env_suit_processor.set_param_config_files(sys_params_file_arg = sys_config,
                                                  chart_params_file_arg = chart_config,
                                                  run_params_file_arg = run_config,
                                                  es_output_dir_arg = output_dir,
                                                  issue_date_arg = config['StartString'],
                                                  disease_csv_template_arg = csv_template_dir)

        env_suit_processor.run_params.FILTER_FOR_COUNTRY = (filter_for_country.upper() == "TRUE")

        # Include further diseases in plotting. In this case the irrigated suitabilite for the rusts.
        # TODO: move this part out into a config
        extra_diseases = [
            EnvSuitDiseaseInfo("Stem rust temp-only", "stem_rust_temponly", config['StartString'], "StemRust_TempOnly", csv_template_dir),
            EnvSuitDiseaseInfo("Leaf rust temp-only", "leaf_rust_temponly", config['StartString'], "LeafRust_TempOnly", csv_template_dir),
            EnvSuitDiseaseInfo("Stripe rust temp-only", "stripe_temponly", config['StartString'], "StripeRust_TempOnly", csv_template_dir)
        ]

        env_suit_processor.add_diseases(diseases=extra_diseases)

        env_suit_processor.process()

        # check the output
        EWSPlottingOutputDir = f"{output_dir}/images/"
        #EWSPlottingOutputGlobs += [
        #        # daily plots
        #        f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png",
        #        # weekly plots
        #        f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"]

        EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"]

    # check the output
    EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False)

    # check there is some output from EWS-plotting
    if not EWSPlottingOutputGlobs:
        logger.error('EWS-Plotting did not produce any output')
        raise RuntimeError

    # provide list for transfer
    EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)])

    return EWSPlottingOutputs