FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit 9bf25b74 authored by L. Bower's avatar L. Bower
Browse files

experimenting with making processor class-based

parent 36cf0012
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
......@@ -15,10 +15,10 @@ from ProcessorAdvisory import (
process_in_job_advisory
)
from ProcessorDeposition import (
process_in_job_dep,
process_EWS_plotting_dep
)
#from ProcessorDeposition import (
# process_in_job_dep,
# process_EWS_plotting_dep
#)
from ProcessorEnvironment import (
process_in_job_env2_0,
process_copy_past_job_env2_0,
......
......@@ -10,6 +10,8 @@ from string import Template
import iris
from iris.cube import CubeList
from Processor import Processor
from ProcessorServer import process_pre_job_server_download
from ProcessorUtils import (
get_only_existing_globs,
subprocess_and_log,
......@@ -18,142 +20,166 @@ from ProcessorUtils import (
from ews_postprocessing.deposition.deposition_post_processor import DepositionPostProcessor
logger = logging.getLogger('Processor.Deposition')
add_filters_to_sublogger(logger)
class ProcessorDeposition(Processor):
def process_in_job_dep(jobPath,status,config,component):
logger.info('started process_in_job_dep()')
def process_pre_job(self, args):
return process_pre_job_server_download(args)
file_path = Template(config[component]['ServerPathTemplate']).substitute(**config)
file_name = Template(config[component]['InputFileTemplate']).substitute(**config)
logger.info(f"Expecting to work with {file_name}")
def process_in_job(self, jobPath, status, configjson, component) -> object:
return self.process_in_job_dep(jobPath, status, configjson, component)
if os.path.exists(f"{jobPath}/{file_name}"):
logger.info('Directory already exists in job directory, so nothing to do here')
return
logger.info('Copying file from remote server to job directory')
def process_post_job(self, jobPath, configjson):
return self.process_EWS_plotting_dep(jobPath, configjson)
# TODO: perform ssh file transfer in python instead of subprocess
server_name: str = config['ServerName']
if server_name == "":
cmd_scp = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath]
else:
cmd_scp = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no",
f"{server_name}:{file_path}/{file_name}.tar.gz", jobPath]
description_short = 'dep scp'
description_long = 'scp from server to job directory'
subprocess_and_log(cmd_scp, description_short, description_long)
def __init__(self) -> None:
super().__init__()
logger = logging.getLogger('Processor.Deposition')
add_filters_to_sublogger(logger)
logger.info('untarring the input file')
# TODO: untar file in python (with tarfile module) instead of subprocess
cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath]
description_short = 'dep tars'
description_long = 'untar the downloaded file'
subprocess_and_log(cmd_tar, description_short, description_long)
def process_in_job_dep(self, jobPath, status, config, component):
self.logger.info('started process_in_job_dep()')
# basic check that contents are as expected
# 132 files of NAME .txt timesteps and one summary png file
# if len(glob(f"{jobPath}/{file_name}/deposition_srcs_allregions_C1_T*.txt")) != 56:
# msg = f"Unexpect number of deposition .txt files in input tar file. Expected 56."
# logger.error(msg)
# raise RuntimeError(msg)
file_path = Template(config[component]['ServerPathTemplate']).substitute(**config)
file_name = Template(config[component]['InputFileTemplate']).substitute(**config)
# basic check that contents are as expected (56 timepoints in the file)
cube_wildcard = f"{jobPath}/{file_name}/deposition_srcs_allregions*.nc"
cubes: CubeList = iris.load(cube_wildcard)
for cube in cubes:
coord = cube.coord("time")
timepoint_count = coord.shape[0]
if timepoint_count != 56:
msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}"
logger.error(msg)
raise RuntimeError(msg)
self.logger.info(f"Expecting to work with {file_name}")
proc_out = {}
# Output files available for upload
proc_out['output'] = None
# Processing files available for clearing
proc_out['clearup'] = [f"{jobPath}/{file_name}.tar.gz"]
if os.path.exists(f"{jobPath}/{file_name}"):
self.logger.info('Directory already exists in job directory, so nothing to do here')
return
return proc_out
self.logger.info('Copying file from remote server to job directory')
def process_EWS_plotting_dep(jobPath,config):
'''Returns a list of output files for transfer.'''
# TODO: perform ssh file transfer in python instead of subprocess
server_name: str = config['ServerName']
if server_name == "":
cmd_scp = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath]
else:
cmd_scp = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no",
f"{server_name}:{file_path}/{file_name}.tar.gz", jobPath]
logger.info('started process_EWS_plotting_dep()')
description_short = 'dep scp'
description_long = 'scp from server to job directory'
subprocess_and_log(cmd_scp, description_short, description_long)
# initialise environment
regions = config['SubRegionNames']
self.logger.info('untarring the input file')
deposition_file_name = Template(config['Deposition']['InputFileTemplate']).substitute(**config)
# TODO: untar file in python (with tarfile module) instead of subprocess
cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath]
description_short = 'dep tars'
description_long = 'untar the downloaded file'
subprocess_and_log(cmd_tar, description_short, description_long)
deposition_path = f"{jobPath}/{deposition_file_name}"
# basic check that contents are as expected
# 132 files of NAME .txt timesteps and one summary png file
# if len(glob(f"{jobPath}/{file_name}/deposition_srcs_allregions_C1_T*.txt")) != 56:
# msg = f"Unexpect number of deposition .txt files in input tar file. Expected 56."
# self.logger.error(msg)
# raise RuntimeError(msg)
# get the file name from the config
# this file name can be a glob, as long as matches can all be loaded by iris
deposition_data_file_name = Template(config['Deposition']['DataFileTemplate']).substitute(**config)
name_file_wildcard = f"{deposition_path}/{deposition_data_file_name}"
# basic check that contents are as expected (56 timepoints in the file)
cube_wildcard = f"{jobPath}/{file_name}/deposition_srcs_allregions*.nc"
cubes: CubeList = iris.load(cube_wildcard)
for cube in cubes:
coord = cube.coord("time")
timepoint_count = coord.shape[0]
if timepoint_count != 56:
msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}"
self.logger.error(msg)
raise RuntimeError(msg)
EWSPlottingOutputGlobs = []
proc_out = {}
# Output files available for upload
proc_out['output'] = None
# Processing files available for clearing
proc_out['clearup'] = [f"{jobPath}/{file_name}.tar.gz"]
for region in regions:
return proc_out
output_dir = f"{jobPath}/plotting/{region.lower()}"
Path(output_dir).mkdir(parents=True, exist_ok=True)
def process_EWS_plotting_dep(self, jobPath, config):
'''Returns a list of output files for transfer.'''
sys_config = config['Deposition']['EWS-Plotting']['SysConfig']
name_extraction_config = config['Deposition']['EWS-Plotting']['NameExtractionConfig']
run_config = config['Deposition']['EWS-Plotting']['RunConfig']
run_config_norm = config['Deposition']['EWS-Plotting']['RunConfigNorm']
chart_config = config['Deposition']['EWS-Plotting'][region]['ChartConfig']
normalize = config['Deposition']['EWS-Plotting'][region]['Normalize']
extraction_file_prefix = 'deposition_' + region.lower()
self.logger.info('started process_EWS_plotting_dep()')
# Note that this runs all disease types available
# initialise environment
regions = config['SubRegionNames']
logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{name_extraction_config}\n{run_config}\n{run_config_norm}\n{chart_config}")
deposition_file_name = Template(config['Deposition']['InputFileTemplate']).substitute(**config)
depo_processor = DepositionPostProcessor()
depo_processor.set_param_config_files(sys_config_file_arg = sys_config,
depo_name_extraction_config_file_arg = name_extraction_config,
chart_config_file_arg = chart_config,
depo_plotting_run_config_file_arg = run_config,
depo_plotting_normalized_run_config_file_arg = run_config_norm,
name_file_wildcard_arg = name_file_wildcard,
wheat_sources_dir_arg = deposition_path,
output_dir_arg = output_dir,
issue_date_arg = config['StartString'],
extraction_file_prefix_arg = extraction_file_prefix)
deposition_path = f"{jobPath}/{deposition_file_name}"
# get the file name from the config
# this file name can be a glob, as long as matches can all be loaded by iris
deposition_data_file_name = Template(config['Deposition']['DataFileTemplate']).substitute(**config)
name_file_wildcard = f"{deposition_path}/{deposition_data_file_name}"
# asia/east africa env suit should not perform normalization, false gets passed here for these areas
depo_processor.name_extract_params.NORMALIZE = (normalize.upper() == "TRUE")
EWSPlottingOutputGlobs = []
depo_processor.process()
for region in regions:
# check the output
EWSPlottingOutputDir = f"{output_dir}/images/"
#EWSPlottingOutputGlobs += [
# # daily plots
# f"{EWSPlottingOutputDir}Daily/deposition_{region.lower()}_*_daily_20*.png",
# # weekly plots
# f"{EWSPlottingOutputDir}Weekly/deposition_{region.lower()}_*_total_20*.png"]
output_dir = f"{jobPath}/plotting/{region.lower()}"
EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"]
Path(output_dir).mkdir(parents=True, exist_ok=True)
EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False)
sys_config = config['Deposition']['EWS-Plotting']['SysConfig']
name_extraction_config = config['Deposition']['EWS-Plotting']['NameExtractionConfig']
run_config = config['Deposition']['EWS-Plotting']['RunConfig']
run_config_norm = config['Deposition']['EWS-Plotting']['RunConfigNorm']
chart_config = config['Deposition']['EWS-Plotting'][region]['ChartConfig']
normalize = config['Deposition']['EWS-Plotting'][region]['Normalize']
extraction_file_prefix = 'deposition_' + region.lower()
# check there is some output from EWS-plotting
if not EWSPlottingOutputGlobs:
logger.error('EWS-Plotting did not produce any output')
raise RuntimeError
# Note that this runs all disease types available
# provide list for transfer
EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)])
self.logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{name_extraction_config}\n{run_config}\n{run_config_norm}\n{chart_config}")
return EWSPlottingOutputs
depo_processor = DepositionPostProcessor()
depo_processor.set_param_config_files(sys_config_file_arg = sys_config,
depo_name_extraction_config_file_arg = name_extraction_config,
chart_config_file_arg = chart_config,
depo_plotting_run_config_file_arg = run_config,
depo_plotting_normalized_run_config_file_arg = run_config_norm,
name_file_wildcard_arg = name_file_wildcard,
wheat_sources_dir_arg = deposition_path,
output_dir_arg = output_dir,
issue_date_arg = config['StartString'],
extraction_file_prefix_arg = extraction_file_prefix)
# asia/east africa env suit should not perform normalization, false gets passed here for these areas
depo_processor.name_extract_params.NORMALIZE = (normalize.upper() == "TRUE")
depo_processor.process()
# check the output
EWSPlottingOutputDir = f"{output_dir}/images/"
#EWSPlottingOutputGlobs += [
# # daily plots
# f"{EWSPlottingOutputDir}Daily/deposition_{region.lower()}_*_daily_20*.png",
# # weekly plots
# f"{EWSPlottingOutputDir}Weekly/deposition_{region.lower()}_*_total_20*.png"]
EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"]
EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False)
# check there is some output from EWS-plotting
if not EWSPlottingOutputGlobs:
self.logger.error('EWS-Plotting did not produce any output')
raise RuntimeError
# provide list for transfer
EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)])
return EWSPlottingOutputs
if __name__ == '__main__':
processor = ProcessorDeposition()
args_dict: dict = processor.parse_and_check_args()
processor.run_Process(args_dict)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment